# # Minimal DeepOnet # # main is solving a simple operator-learning problem: # cumulative integral # v(y) = G[u](y) = \int_0^y u(x) dx # # Input: # # function u(x), sampled at discrete points x1,…,x10 : called 10 sensors # code: U_train # # Output: # # function v(y)=∫_0^y u(x)dx evaluated at the same points y1,…,y10 # code: Y_train # # Operator G maps functions → functions. # How: # - The branch net encodes the input function u into features. # - The trunk net produces basis functions evaluated at the output points y. # Their inner product reconstructs v(y) that approximates the integral. # # Note: DeepONet doesn’t need the function u in closed form - it just needs samples at sensor points. # By training on many random input functions, the network learns the general mapping # u(.) -> \int_0^y u(x)dx # # Once trained, DeepONet can approximate the integral for any new u, as long as # - The new u is reasonably smooth (and in training range) # - The sensor points are dense enough to capture the function’s variations. # By feeding many examples of functions and their integrals, the network learns the operator itself, # not just memorizing a single function. import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import matplotlib.pyplot as plt # Branch Net class BranchNet(nn.Module): def __init__(self, input_dim, hidden_dim, output_dim): print(f'{input_dim = } {output_dim = }') super().__init__() # Fully connected (dense) layers, nn.Linear(in_features, out_features, bias=True) self.fc1 = nn.Linear(input_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, hidden_dim) self.fc4 = nn.Linear(hidden_dim, output_dim) def forward(self, u): # self.fc1(u) is z1​=u@(w1)^T​+b1​ # F.relu(self.fc1(u)) is max(0, z1), where z1​=u@(w1)^T​+b1​ x = F.relu(self.fc1(u)) x = F.relu(self.fc2(x)) x = F.relu(self.fc3(x)) return self.fc4(x) # Trunk Net class TrunkNet(nn.Module): def __init__(self, coord_dim, hidden_dim, output_dim): print(f'{coord_dim = } {output_dim = }') super().__init__() self.fc1 = nn.Linear(coord_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, hidden_dim) self.fc4 = nn.Linear(hidden_dim, output_dim) def forward(self, y): x = F.relu(self.fc1(y)) x = F.relu(self.fc2(x)) x = F.relu(self.fc3(x)) return self.fc4(x) # DeepONet class DeepONet(nn.Module): def __init__(self, branch_in, trunk_in, hidden=32, p=20): super().__init__() self.branch = BranchNet(branch_in, hidden, p) self.trunk = TrunkNet(trunk_in, hidden, p) def forward(self, u, y): """ u: (n_samples, m_sensors) y: (num_y, 1) """ b = self.branch(u) # (n_samples, p); p features per function (“coefficients” for basis functions) t = self.trunk(y) # (num_y, p) # b[i, :] are the coefficients for the ith input function # t[j, :] are the trunk features at the jth output point. out = b @ t.T # (n_samples, num_y) return out if __name__=='__main__': # Parameters n_samples = 500 m_sensors = 20 # number of points per function # Sensor points along x ∈ [0,1] x_sensor = np.linspace(0, 1, m_sensors) # Input functions: random values at sensor points U_train = np.random.uniform(-1, 1, size=(n_samples, m_sensors))# allow values u<0 # Output functions: cumulative integral of input functions dx = 1.0 / (m_sensors - 1) # spacing between sensor points Y_train_list = [] for u in U_train: v = np.zeros_like(u) # initialize cumulative integral # trapezoid approximation v[1:] = np.cumsum(0.5 * (u[1:] + u[:-1]) * dx) Y_train_list.append(v) Y_train = np.array(Y_train_list) # convert list to array # Convert to PyTorch tensors U_train = torch.tensor(U_train, dtype=torch.float32) Y_train = torch.tensor(Y_train, dtype=torch.float32) x_sensor_t = torch.tensor(x_sensor.reshape(-1,1), dtype=torch.float32) model = DeepONet(branch_in=m_sensors, trunk_in=1, hidden=32, p=20) optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) loss_fn = nn.MSELoss() n_epochs = 1000 for epoch in range(n_epochs): optimizer.zero_grad() # Forward pass: compute outputs for all y at once v_pred = model(U_train, x_sensor_t) # shape: (n_samples, m_sensors) # Loss over all points loss = loss_fn(v_pred, Y_train) loss.backward() optimizer.step() if epoch % 100 == 0: print(f"Epoch {epoch}, Loss: {loss.item():.5f}") # Pick one test function (e.g., index 0) test_idx = 0 v_true = Y_train[test_idx].numpy() # shape: (m_sensors,) v_pred_single = v_pred[test_idx].detach().numpy() # shape: (m_sensors,) plt.figure() plt.title('DeepOnet cumulative integral training') plt.plot(x_sensor, v_true, label="True integral") plt.plot(x_sensor, v_pred_single, "--", label="DeepONet prediction") plt.xlabel("y") plt.ylabel("v(y)") plt.legend() #plt.show() # Example: new test function sampled at sensor points x_sensor = np.linspace(0, 1, m_sensors) # New function u_test(x) = sin(2*pi*x) at sensor points u_test = np.sin(2 * np.pi * x_sensor) u_test_t = torch.tensor(u_test.reshape(1, -1), dtype=torch.float32) # shape (1, m_sensors) # Convert sensor points x_sensor_t = torch.tensor(x_sensor.reshape(-1, 1), dtype=torch.float32) # shape (m_sensors, 1) # Forward pass through DeepONet # Repeat u_test for each output point v_pred = model(u_test_t.repeat(m_sensors, 1), x_sensor_t) # shape (m_sensors, m_sensors) # Take diagonal to match the cumulative integral at each sensor point v_pred_cumulative = v_pred.diagonal().detach().numpy() # shape (m_sensors,) # True integral using trapezoid # v_true = np.cumsum(u_test) * (1.0 / (m_sensors - 1)) this assumes all u>0 dx = x_sensor[1] - x_sensor[0] v_true = np.zeros_like(u_test) v_true[1:] = np.cumsum(0.5 * (u_test[1:] + u_test[:-1]) * dx) # plot plt.figure() plt.title('DeepOnet cumulative integral testing') plt.plot(x_sensor, v_true, label="True integral") plt.plot(x_sensor, v_pred_cumulative, "--", label="DeepONet prediction") plt.xlabel("y") plt.ylabel("v(y)") plt.legend() plt.show()