Source code for volcapy.inverse.random_spatial_effects

""" Gaussian Process for Inversion class, but implementing Cressie's basis
functions approach (Spatial Random Effects).

"""
import torch
gpu = torch.device('cuda:0')
cpu = torch.device('cpu')


[docs]class SREModel(torch.nn.Module): """ Spatial Random Effects à la Cressie. """ def __init__(self, F, d_obs, data_cov, sigma0_init, cells_coords, n_inducing): """ Parameters ---------- F Forward operator matrix d_obs Observed data vector. data_cov Data (observations) covariance matrix. sigma0_init Original value of the sigma0 parameter to use when starting optimization. cells_coords: array n_model * n_dims array containing coordinates of discretized model cells. n_inducing: int Number of inducing points to use (i.e. number of basis functions). """ super(SREModel, self).__init__() # Store the sigma0 after optimization, since can be used as starting # point for next optim. self.sigma0 = torch.nn.Parameter(torch.tensor(sigma0_init)) # Sizes self.n_model = F.shape[1] self.n_data = F.shape[0] # Prior mean (vector) on the data side. self.mu0_d_stripped = torch.mm(F, torch.ones((self.n_model, 1))) self.d_obs = d_obs self.data_cov = data_cov # Identity vector. Need for concentration. self.I_d = torch.ones((self.n_data, 1), dtype=torch.float32) # Matrix of basis functions. self.S = self.build_basis_functions(cells_coords, n_inducing)
[docs] def build_basis_functions(self, cells_coords, n_inducing): """ Build the matrix of basis functions. Currently only uses RBFs centered at *n_inducing* randomly sampled inducing points. All RBFs have a lengthscale of 1. Parameters ---------- cells_coords: array n_model * n_dims array containing coordinates of discretized model cells. n_inducing: int Number of inducing points to use (i.e. number of basis functions). Returns ------- Array The n_model * n_inducing matrix of basis functions evaluated at model points. """ # Randomly sample inducing points from the available cells. inducing_points_inds = np.random.choice( cells_coords.shape[0], n_inducing, replace=False) inducing_points = torch.from_numpy( cells_coords[inducing_points_inds]) # Build the matrix of squared distances between inducing points and # model points. x = inducing_points.unsqueeze(1).expand(n_model, n_basis, n_dims) y = coords.unsqueeze(0).expand(n_model, n_basis, n_dims) dist = torch.pow(x - y, 2).sum(2) # Compute basis function matrix from the distances. return torch.exp(- dist / 2)
[docs] def to_device(self, device): """ Transfer all model attributes to the given device. Can be used to switch between cpu and gpu. Parameters ---------- device: torch.Device """ self.sigma0 = self.sigma0.to(device) self.mu0_d_stripped = self.mu0_d_stripped.to(device) self.d_obs = self.d_obs.to(device) self.data_cov = self.data_cov.to(device) self.I_d = self.I_d.to(device) self.S = self.S.to_device(device)
[docs] def neg_log_likelihood(self): """ Computes the negative log-likelihood of the current state of the model. Note that this function should be called AFTER having run a conditioning, since it depends on the inversion operator computed there. Returns ------- float """ # Need to do it this way, otherwise rounding errors kill everything. log_det = - torch.logdet(self.inversion_operator) nll = torch.add( log_det, torch.mm( self.prior_misfit.t(), torch.mm(self.inversion_operator, self.prior_misfit))) return nll
[docs] def concentrate_m0(self): """ Compute m0 (prior mean parameter) by MLE via concentration. Note that the inversion operator should have been updated first. """ conc_m0 = torch.mm( torch.inverse( torch.mm( torch.mm(self.mu0_d_stripped.t(), self.inversion_operator), self.mu0_d_stripped)), torch.mm( self.mu0_d_stripped.t(), torch.mm(self.inversion_operator, self.d_obs))) return conc_m0
[docs] def condition_data(self, K_d, sigma0, m0=0.1, concentrate=False): """ Condition model on the data side. Parameters ---------- K_d Covariance matrix on the data side. sigma0 Standard deviation parameter for the kernel. m0 Prior mean parameter. concentrate If true, then will compute m0 by MLE via concentration of the log-likelihood. Returns ------- mu_post_d Posterior mean data vector """ inv_inversion_operator = torch.add( self.data_cov, sigma0**2 * K_d) # Compute inversion operator and store once and for all. self.inversion_operator = torch.inverse(inv_inversion_operator) if concentrate: # Determine m0 (on the model side) from sigma0 by concentration of the Ll. m0 = self.concentrate_m0() self.mu0_d = m0 * self.mu0_d_stripped # Store m0 in case we want to print it later. self.m0 = m0 self. prior_misfit = torch.sub(self.d_obs, self.mu0_d) weights = torch.mm(self.inversion_operator, self.prior_misfit) mu_post_d = torch.add( self.mu0_d, torch.mm(sigma0**2 * K_d, weights)) # Store in case. self.mu_post_d = mu_post_d return mu_post_d
[docs] def condition_model(self, cov_pushfwd, F, sigma0, m0=0.1, concentrate=False): """ Condition model on the model side. Parameters ---------- cov_pushfwd Pushforwarded covariance matrix, i.e. K F^T F Forward operator sigma0 Standard deviation parameter for the kernel. m0 Prior mean parameter. concentrate If true, then will compute m0 by MLE via concentration of the log-likelihood. Returns ------- mu_post_m Posterior mean model vector mu_post_d Posterior mean data vector """ inv_inversion_operator = torch.add( self.data_cov, sigma0**2 * torch.mm(F, cov_pushfwd)) # Compute inversion operator and store once and for all. self.inversion_operator = torch.inverse(inv_inversion_operator) if concentrate: # Determine m0 (on the model side) from sigma0 by concentration of the Ll. m0 = self.concentrate_m0() # Prior mean for data and model. self.mu0_d = m0 * self.mu0_d_stripped self.mu0_m = m0 * torch.ones((self.n_model, 1)) # Store m0 in case we want to print it later. self.m0 = m0 self. prior_misfit = torch.sub(self.d_obs, self.mu0_d) weights = torch.mm(self.inversion_operator, self.prior_misfit) # Posterior data mean. self.mu_post_d = torch.add( self.mu0_d, torch.mm(sigma0**2 * torch.mm(F, cov_pushfwd), weights)) # Posterior model mean. self.mu_post_m = torch.add( self.mu0_m, torch.mm(sigma0**2 * cov_pushfwd, weights)) return self.mu_post_m, self.mu_post_d
[docs] def optimize(self, K_d, n_epochs, device, logger, sigma0_init=None, lr=0.007): """ Given lambda0, optimize the two remaining hyperparams via MLE. Here, instead of giving lambda0, we give a (stripped) covariance matrix. Stripped means without sigma0. The user can choose between CPU and GPU. Parameters ---------- K_d: 2D Tensor (stripped) Covariance matrix in data space. n_epochs: int Number of training epochs. device: Torch.device Device to use for optimization, either CPU or GPU. logger An instance of logging.Logger, used to output training progression. sigma0_init: float Starting value for gradient descent. If None, then use the value sotred by the model class (that is, the one resulting from the previous optimization run). lr: float Learning rate. """ # Send everything to the correct device. self.to_device(device) K_d = K_d.to(device) # Initialize sigma0. if sigma0_init is not None: self.sigma0 = torch.nn.Parameter(torch.tensor(sigma0_init)).to(device) optimizer = torch.optim.Adam(self.parameters(), lr=lr) for epoch in range(n_epochs): # Forward pass: Compute predicted y by passing # x to the model m_posterior_d = self.condition_data(K_d, self.sigma0, concentrate=True) log_likelihood = self.neg_log_likelihood() # Zero gradients, perform a backward pass, # and update the weights. optimizer.zero_grad() log_likelihood.backward(retain_graph=True) optimizer.step() # Periodically print informations. if epoch % 100 == 0: # Compute train error. train_RMSE = self.train_RMSE() logger.info("Log-likelihood: {}".format(log_likelihood.item())) logger.info("RMSE train error: {}".format(train_RMSE.item())) logger.info("Log-likelihood: {}".format(log_likelihood.item())) logger.info("RMSE train error: {}".format(train_RMSE.item())) # Send everything back to cpu. self.to_device(cpu) return
[docs] def loo_predict(self, loo_ind): """ Leave one out krigging prediction. Take the trained hyperparameters. Remove one point from the training set, krig/condition on the remaining point and predict the left out point. WARNING: Should have run the forward pass of the model once before, otherwise some of the operators we need (inversion operator) won't have been computed. Also should re-run the forward pass when updating hyperparameters. Parameters ---------- loo_ind: int Index (in the training set) of the data point to leave out. Returns ------- float Prediction at left out data point. """ # Index of the not removed data points. in_inds = list(range(len(self.d_obs))) in_inds.remove(loo_ind) # Note that for the dot product, we should have one-dimensional # vectors, hence the strange indexing with the zero. loo_pred = (self.mu0_d[loo_ind] - 1/self.inversion_operator[loo_ind, loo_ind].detach() * torch.dot( self.inversion_operator[loo_ind, in_inds].detach(), self.prior_misfit[in_inds, 0].detach())) return loo_pred.detach()
[docs] def loo_error(self): """ Leave one out cross validation RMSE. Take the trained hyperparameters. Remove one point from the training set, krig/condition on the remaining point and predict the left out point. Compute the squared error, repeat for all data points (leaving one out at a time) and average. WARNING: Should have run the forward pass of the model once before, otherwise some of the operators we need (inversion operator) won't have been computed. Also should re-run the forward pass when updating hyperparameters. Returns ------- float RMSE cross-validation error. """ tot_error = 0 for loo_ind in range(len(self.d_obs)): loo_prediction = self.loo_predict(loo_ind) tot_error += (self.d_obs[loo_ind].item() - loo_prediction)**2 return torch.sqrt((tot_error / len(self.d_obs)))
[docs] def train_RMSE(self): """ Compute current error on training set. """ return torch.sqrt(torch.mean( (self.d_obs - self.mu_post_d)**2))