# # Minimal DeepOnet # # main is solving a simple operator-learning problem: # cumulative integral # G[u](y)=\int_0^y u(x) dx # # Input: # # function u(x), sampled at discrete points x1,…,x10 : called 10 sensors # code: U_train # # Output: # # function v(y)=∫_0^y u(x)dx evaluated at the same points y1,…,y10 # code: Y_train # # Operator G maps functions → functions. # How: # - The branch net encodes the input function u into features. # - The trunk net produces basis functions evaluated at the output points y. # Their inner product reconstructs v(y) that approximates the integral. # # Note: DeepONet doesn’t need the function u in closed form - it just needs samples at sensor points. # By training on many random input functions, the network learns the general mapping # u(.) -> \int_0^y u(x)dx # # Once trained, DeepONet can approximate the integral for any new u, as long as # - The new u is reasonably smooth (and in training range) # - The sensor points are dense enough to capture the function’s variations. # By feeding many examples of functions and their integrals, the network learns the operator itself, # not just memorizing a single function. import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import matplotlib.pyplot as plt # Branch or Trunk Net class BranchTruncNet(nn.Module): def __init__(self, input_dim, hidden_dim = [32, 32, 32], output_dim = 20): super().__init__() # Fully connected (dense) layers, nn.Linear(in_features, out_features, bias=True) n_hidden = len(hidden_dim) print(f'input layer dim = {input_dim}') for _ in range(n_hidden): print(f'hidden layer {_} dim = {hidden_dim[_]}') print(f'output layer dim = {output_dim}') print('-'*30) layers = [] n_hidden = len(hidden_dim) layers.append(nn.Linear(input_dim, hidden_dim[0])) for _ in range(n_hidden-1): layers.append(nn.Linear(hidden_dim[_], hidden_dim[_+1])) layers.append(nn.Linear(hidden_dim[-1], output_dim)) self.layers = nn.ModuleList(layers) def forward(self, x): for layer in self.layers[:-1]: x = F.relu(layer(x)) return self.layers[-1](x) # DeepONet class DeepONet(nn.Module): def __init__(self, branch_in, trunk_in, hidden, p): super().__init__() print('------ BranchNet ---------- ') self.branch = BranchTruncNet(branch_in, hidden, p) print('------ TruncNet ---------- ') self.trunk = BranchTruncNet(trunk_in, hidden, p) def forward(self, u, y): """ u: (n_samples, m_sensors) y: (num_y, 1) """ b = self.branch(u) # (n_samples, p); p features per function (“coefficients” for basis functions) t = self.trunk(y) # (num_y, p) # b[i, :] are the coefficients for the ith input function # t[j, :] are the trunk features at the jth output point. # G(u)(y) \approx \sum_k=1^p b_k(u)*t_k(y) := out out = b @ t.T # (n_samples, num_y) return out def rbf_kernel(x1, x2, length_scale=0.1): """Squared exponential kernel""" x1 = x1[:, None] # shape (m,1) x2 = x2[None, :] # shape (1,m) return np.exp(-0.5 * ((x1 - x2)/length_scale)**2) if __name__=='__main__': # Parameters n_samples = 5000 # number of input functions u used in learning m_sensors = 50 # number of points per function # Sensor points along x ∈ [0,1] x_sensor = np.linspace(0, 1, m_sensors) # CASE 1: Input functions are random values at sensor points #U = np.random.uniform(-1, 1, size=(n_samples, m_sensors))# allow values u<0 # CASE 2: Input functions are smooth functions generated using GPR K = rbf_kernel(x_sensor, x_sensor, length_scale=0.1) # GPR covariance U = np.random.multivariate_normal(mean=np.zeros(m_sensors), cov=K, size=n_samples) # Output functions: cumulative integral of input functions dx = 1.0 / (m_sensors - 1) # spacing between sensor points Y_list = [] for u in U: v = np.zeros_like(u) # initialize cumulative integral # trapezoid approximation v[1:] = np.cumsum(0.5 * (u[1:] + u[:-1]) * dx) Y_list.append(v) Y = np.array(Y_list) # Divide to training and validation sets split = int(0.8 * n_samples) U_train, U_val = U[:split], U[split:] Y_train, Y_val = Y[:split], Y[split:] # Convert to PyTorch tensors U_train = torch.tensor(U_train, dtype=torch.float32) Y_train = torch.tensor(Y_train, dtype=torch.float32) U_val = torch.tensor(U_val, dtype=torch.float32) Y_val = torch.tensor(Y_val, dtype=torch.float32) x_sensor = torch.tensor(x_sensor.reshape(-1,1), dtype=torch.float32) model = DeepONet(branch_in=m_sensors, trunk_in=1, hidden=[64,64,64], p=50) # # clipping to filter out instabilities in optimization process (showing as sudden loss peaks) # torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=10.0) optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) loss_fn = nn.MSELoss() # plot one curve and how training gets more accurate fit v_true = Y_train[0].numpy() plt.figure() plt.plot(x_sensor, v_true, 'go') plt.title('One training curve and evolution of DeepOnet prediction') # n_epochs = 15000 train_losses, val_losses = [], [] for epoch in range(n_epochs): model.train() optimizer.zero_grad() pred_train = model(U_train, x_sensor) # (n_train, m_sensors) loss_train = loss_fn(pred_train, Y_train) loss_train.backward() optimizer.step() # Validation model.eval() with torch.no_grad(): pred_val = model(U_val, x_sensor) loss_val = loss_fn(pred_val, Y_val) train_losses.append(loss_train.item()) val_losses.append(loss_val.item()) if epoch % 100 == 0: print(f"Epoch {epoch}, Train Loss: {loss_train.item():.6f} Val Loss: {loss_val.item():.6f}") if epoch % 1000 == 0: # plot one predicted v_pred = pred_train[0].detach().numpy() plt.plot(x_sensor, v_pred, "r--") plt.draw() plt.pause(1e-3) plt.figure() plt.plot(train_losses, label="train loss") plt.plot(val_losses, label="val loss") plt.xlabel("Epoch") plt.ylabel("MSE Loss") plt.legend() plt.title("DeepONet Training vs Validation Loss") plt.draw() plt.pause(1e-3) # Plot a few trained and validated function for data in [ (Y_train, pred_train, "Training"),(Y_val, pred_val, "Validation")]: Y, pred, title = data plt.figure() plt.title(title) plt.xlabel("y") plt.ylabel("v(y)") for test_idx in range(5): v_true = Y[test_idx].numpy() # (m_sensors,) v_pred_single = pred[test_idx].detach().numpy() # (m_sensors,) lab = "True cumulative integral" if test_idx==0 else "_nolabel_" plt.plot(x_sensor, v_true, label=lab) lab= "DeepONet prediction" if test_idx==0 else "_nolabel_" plt.plot(x_sensor, v_pred_single, "--", label=lab) plt.legend() plt.draw() plt.pause(1e-3) # Example: new test function sampled at sensor points x_sensor = np.linspace(0, 1, m_sensors) # New function u_test(x) = sin(2*pi*x) at sensor points u_test = np.sin(2.5*np.pi * x_sensor) u_test_t = torch.tensor(u_test.reshape(1, -1), dtype=torch.float32) # (1, m_sensors) # Convert sensor points x_sensor_t = torch.tensor(x_sensor.reshape(-1, 1), dtype=torch.float32) # (m_sensors, 1) # Forward pass through DeepONet # Repeat u_test for each output point v_pred = model(u_test_t.repeat(m_sensors, 1), x_sensor_t) # (m_sensors, m_sensors) # Take diagonal to match the cumulative integral at each sensor point v_pred_cumulative = v_pred.diagonal().detach().numpy() # (m_sensors,) # True integral using trapezoid # v_true = np.cumsum(u_test) * (1.0 / (m_sensors - 1)) this assumes all u>0 dx = x_sensor[1] - x_sensor[0] v_true = np.zeros_like(u_test) v_true[1:] = np.cumsum(0.5 * (u_test[1:] + u_test[:-1]) * dx) # plot plt.figure() plt.title(r'DeepOnet cumulative integral $v(y) = \int_0^y \sin(2.5\pi x) dx$') plt.plot(x_sensor, v_true, label="True cum. integral") plt.plot(x_sensor, v_pred_cumulative, "--", label="DeepONet prediction") plt.xlabel("y") plt.ylabel("v(y)") plt.legend() plt.show()