Source code for volcapy.inverse.gaussian_process

"""
This class implements gaussian process regression/conditioning for inverse
problems, i.e. when conditioning on a linear operator of the field.
The situation is we have a gaussian process on some space and a linear
operator, denoted :math:`F`, acting on the process and mapping it to another space.
This linear operator induces a gaussian process on the target space.
We will use the term *model* when we refer to the original gaussian process and
*data* when we refer to the induced gaussian process.
We discretize model-space and data-space, so that F becomes a matrix.

Notation and Implementation
---------------------------
We denote by :math:`K` the covariance matrix of the original field. We assume a
constant prior mean vector :math:`\mu_0 = m_0 1_m`, where :math:`1_m` stands for the identity
vector in model space. Hence we only have one scalar parameter m0 for the prior mean.

The induced gaussian process then has covariance matrix :math:`K_d = F K F^t` and
prior mean vector :math:`m_0 F ~ 1_m`.

Regression/Conditioning
-----------------------
We observe the value of the induced field at some points in data-space. We then
condition our GP on those observations and obtain posterior mean vector /
covariance matrix.

Covariance Matrices
-------------------
Most kernels have a variance parameter :math:`\sigma_0^2` that just appears as a
multiplicative constant. To make its optimization easier, we strip it of the
covariance matrix, store it as a model parameter (i.e. as an attribute of the
class) and include it manually in the experessions where it shows up.

This means that when one sees a covariance matrix in the code, it generally
doesn't include the :math:`\sigma_0` factor, which has to be included by hand.

Covariance Pushforward
----------------------
The model covariance matrix :math:`K` is to big to be stored, but during conditioning
it only shows up in the form :math:`K F^t`. It is thus sufficient to compute this
product once and for all. We call it the *covariance pushforward*.

Noise
-----
Independent homoscedactic noise on the data is assumed. It is specified by
*data_std_orig*. If the matrices are ill-conditioned, the the noise will have
to be increased, hence the current value of the noise will be stored in
*noise_std*.

Important Implementation Detail
-------------------------------
Products of vectors with the inversion operator R^{-1} * x should be computed
using inv_op_vector_mult(x).

Conditioning
------------
Conditioning always compute the inversion operator, and m0 via concentration.
Hence, every call to conditioning (condition_data or condition_model), will
update the following attributes:
    - inv_op_L
    - inversion_operator
    - m0
    - mu0_d
    - prior_misfit
    - weights
    - mu_post_d (TODO: maybe shouldn't be an attribute of the GP.)

"""
import numpy as np
import torch
gpu = torch.device('cuda:0')
cpu = torch.device('cpu')


[docs]class GaussianProcess(torch.nn.Module): """ Attributes ---------- sigma0_init Original value of the sigma0 parameter to use when starting optimization. sigma0: float Current value of the prior standard deviation of the process. Will get optimized. We store it so we can use it as starting value for optimization of similar lambda0s. data_std_orig: float Original data noise standard deviation. data_std: float Current data noise deviation. Can change from the original value since we may have to increase it to make matrices invertible/better conditioned. F: ndarray Forward operator matrix, n_d * n_m d_obs: tensor Vector of observed data values n_model: int Number of model cells. n_data: int Number of data observations. mu0_d_stripped: Tensor The pushforwarded mean vector, stripped of the constant prior means parameter. I.e. this is F * 1_m. Multiply by the prior mean m0 to get the data-side mean. data_ones inv_op_L: Tensor Lower Cholesky factor of the inverse inversion operator R. inversion_operator mu0_d m0 prior_misfit weights mu_post_d mu_post_m stripped_inv mu0_m R: Tensor. Inverse inversion operator. Given by data_std**2 * 1_{d*d} + sigma0**2 * K_d logger An instance of logging.Logger, used to output training progression. """ def __init__(self, F, d_obs, sigma0_init, data_std=0.1, logger=None): """ Parameters ---------- F: ndarray Forward operator matrix, n_d * n_m d_obs Observed data vector. sigma0_init Original value of the sigma0 parameter to use when starting optimization. data_std: float Data noise standard deviation. logger An instance of logging.Logger, used to output training progression. """ super(GaussianProcess, self).__init__() # Store the sigma0 after optimization, since can be used as starting # point for next optim. self.sigma0 = torch.nn.Parameter(torch.tensor(sigma0_init)) # Sizes self.n_model = F.shape[1] self.n_data = F.shape[0] # Prior mean (vector) on the data side. self.mu0_d_stripped = torch.mm(F, torch.ones((self.n_model, 1), dtype=torch.float32)) self.d_obs = d_obs self.data_ones = torch.eye(self.n_data, dtype=torch.float32) self.data_std_orig = data_std # Used to store the modified noise level, if has to be increased to # make matrix invertible. self.data_std = data_std # If no logger, create one. if logger is None: import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) self.logger = logger
[docs] def to_device(self, device): """ Transfer all model attributes to the given device. Can be used to switch between cpu and gpu. Parameters ---------- device: torch.Device """ self.sigma0 = torch.nn.Parameter(self.sigma0.to(device)) self.mu0_d_stripped = self.mu0_d_stripped.to(device) self.d_obs = self.d_obs.to(device) self.data_ones = self.data_ones.to(device)
[docs] def neg_log_likelihood(self): """ Computes the negative log-likelihood of the current state of the model. Note that this function should be called AFTER having run a conditioning, since it depends on the inversion operator computed there. Returns ------- float """ # WARNING!!! determinant is not linear! Taking constants outside adds # power to them. log_det = torch.logdet(self.R) nll = log_det + torch.mm(self.prior_misfit.t(), self.weights) return nll
[docs] def concentrate_m0(self): """ Compute m0 (prior mean parameter) by MLE via concentration. Note that the inversion operator should have been updated first. """ # Compute R^(-1) * G * I_m. tmp = self.inv_op_vector_mult(self.mu0_d_stripped) conc_m0 = torch.mm(self.d_obs.t(), tmp) / torch.mm(self.mu0_d_stripped.t(), tmp) return conc_m0
[docs] def condition_data(self, K_d, sigma0, m0=0.1, concentrate=False): """ Condition model on the data side. Parameters ---------- K_d: tensor Covariance matrix on the data side. sigma0: float Standard deviation parameter for the kernel. m0: float Prior mean parameter. concentrate: bool If true, then will compute m0 by MLE via concentration of the log-likelihood. Returns ------- mu_post_d: tensor Posterior mean data vector """ # Get Cholesky factor (lower triangular) of the inversion operator. self.inv_op_L = self.get_inversion_op_cholesky(K_d, sigma0) self.inversion_operator = torch.cholesky_inverse(self.inv_op_L) if concentrate: # Determine m0 (on the model side) from sigma0 by concentration of the Ll. self.m0 = self.concentrate_m0() self.mu0_d = self.m0 * self.mu0_d_stripped self. prior_misfit = self.d_obs - self.mu0_d self.weights = self.inv_op_vector_mult(self.prior_misfit) self.mu_post_d = self.mu0_d + torch.mm(sigma0**2 * K_d, self.weights) return self.mu_post_d
[docs] def condition_model(self, cov_pushfwd, F, sigma0, m0=0.1, concentrate=False, NtV_crit=-1.0): """ Condition model on the model side. Parameters ---------- cov_pushfwd: tensor Pushforwarded covariance matrix, i.e. K F^T F Forward operator sigma0: float Standard deviation parameter for the kernel. m0: float Prior mean parameter. concentrate: bool If true, then will compute m0 by MLE via concentration of the log-likelihood. NtV_crit: (deprecated) Critical Noise to Variance ratio (epsilon/sigma0). We can fix it to avoid numerical instability in the matrix inversion. If provided, will not allow to go below the critical value. Returns ------- mu_post_m Posterior mean model vector mu_post_d Posterior mean data vector """ # Conditioning model is just conditioning on data and then computing # posterior mean and (co-)variance on model side. K_d = torch.mm(F, cov_pushfwd) mu_post_d = self.condition_data(K_d, sigma0, m0, concentrate=concentrate) # Posterior model mean. # Can re-use the m0 and weights computed by condition_data. self.mu_post_m = torch.add( m0 * torch.ones((self.n_model, 1)), torch.mm(sigma0**2 * cov_pushfwd, self.weights)) return self.mu_post_m.detach(), self.mu_post_d
[docs] def optimize(self, K_d, n_epochs, device, sigma0_init=None, lr=0.007, NtV_crit=-1.0): """ Given lambda0, optimize the two remaining hyperparams via MLE. Here, instead of giving lambda0, we give a (stripped) covariance matrix. Stripped means without sigma0. The user can choose between CPU and GPU. Parameters ---------- K_d: tensor (stripped) Covariance matrix in data space. n_epochs: int Number of training epochs. device: Torch.device Device to use for optimization, either CPU or GPU. sigma0_init: float Starting value for gradient descent. If None, then use the value sotred by the model class (that is, the one resulting from the previous optimization run). lr: float Learning rate. NtV_crit: (deprecated) """ # Send everything to the correct device. self.to_device(device) K_d = K_d.to(device) # Initialize sigma0. if sigma0_init is not None: self.sigma0 = torch.nn.Parameter(torch.tensor(sigma0_init)).to(device) optimizer = torch.optim.Adam(self.parameters(), lr=lr) for epoch in range(n_epochs): # Forward pass: Compute predicted y by passing # x to the model m_posterior_d = self.condition_data(K_d, self.sigma0, concentrate=True) log_likelihood = self.neg_log_likelihood() # Zero gradients, perform a backward pass, # and update the weights. optimizer.zero_grad() log_likelihood.backward(retain_graph=True) optimizer.step() # Periodically print informations. if epoch % 100 == 0: # Compute train error. train_RMSE = self.train_RMSE() self.logger.info("sigma0: {}".format(self.sigma0.item())) self.logger.info("Log-likelihood: {}".format(log_likelihood.item())) self.logger.info("RMSE train error: {}".format(train_RMSE.item())) self.logger.info("Log-likelihood: {}".format(log_likelihood.item())) self.logger.info("RMSE train error: {}".format(train_RMSE.item())) # Send everything back to cpu. self.to_device(cpu) return
[docs] def post_cov(self, cov_pushfwd, cells_coords, lambda0, sigma0, i, j, cl): """ Get posterior model covariance between two specified cells. Parameters ---------- cov_pushfwd: tensor Pushforwarded covariance matrix, i.e. K F^T cells_coords: tensor n_cells * n_dims: cells coordinates lambda0: float Lenght-scale parameter sigma0: float Prior variance. i: int Index of first cell (index in the cells_coords array). j: int Index of second cell. cl: module Covariance module implementing the kernel we are working with. The goal is to make this function kernel independent, though its a bit awkward and should be changed to an object oriented approach. Returns ------- post_cov: float Posterior covariance between model cells i and j. """ cov = cl.compute_cov(lambda0, cells_coords, i, j) post_cov = sigma0**2 * (cov - torch.mm( sigma0**2 * cov_pushfwd[i, :].reshape(1, -1), torch.mm( self.inversion_operator, cov_pushfwd[j, :].reshape(-1, 1)))) return post_cov
[docs] def compute_post_cov_diag(self, cov_pushfwd, cells_coords, lambda0, sigma0, cl): """ Compute diagonal of the posterior covariance matrix. Parameters ---------- cov_pushfwd: tensor Pushforwarded covariance matrix, i.e. K F^T cells_coords: tensor n_cells * n_dims: cells coordinates lambda0: float Lenght-scale parameter sigma0: float Prior variance. cl: module Covariance module implementing the kernel we are working with. The goal is to make this function kernel independent, though its a bit awkward and should be changed to an object oriented approach. Returns ------- post_cov_diag: tensor Vector containing the posterior variance of model cell i as its i-th element. """ n_cells = cells_coords.shape[0] post_cov_diag = np.zeros(n_cells) for i in range(n_cells): post_cov_diag[i] = self.post_cov( cov_pushfwd, cells_coords, lambda0, sigma0, i, i, cl) return post_cov_diag
[docs] def loo_predict(self, loo_ind): """ Leave one out krigging prediction. Take the trained hyperparameters. Remove one point from the training set, krig/condition on the remaining point and predict the left out point. WARNING: Should have run the forward pass of the model once before, otherwise some of the operators we need (inversion operator) won't have been computed. Also should re-run the forward pass when updating hyperparameters. Parameters ---------- loo_ind: int Index (in the training set) of the data point to leave out. Returns ------- float Prediction at left out data point. """ # Index of the not removed data points. in_inds = list(range(len(self.d_obs))) in_inds.remove(loo_ind) # Note that for the dot product, we should have one-dimensional # vectors, hence the strange indexing with the zero. loo_pred = (self.mu0_d[loo_ind] - 1/self.inversion_operator[loo_ind, loo_ind].detach() * torch.dot( self.inversion_operator[loo_ind, in_inds].detach(), self.prior_misfit[in_inds, 0].detach())) return loo_pred.detach()
[docs] def loo_error(self): """ Leave one out cross validation RMSE. Take the trained hyperparameters. Remove one point from the training set, krig/condition on the remaining point and predict the left out point. Compute the squared error, repeat for all data points (leaving one out at a time) and average. WARNING: Should have run the forward pass of the model once before, otherwise some of the operators we need (inversion operator) won't have been computed. Also should re-run the forward pass when updating hyperparameters. Returns ------- float RMSE cross-validation error. """ tot_error = 0 for loo_ind in range(len(self.d_obs)): loo_prediction = self.loo_predict(loo_ind) tot_error += (self.d_obs[loo_ind].item() - loo_prediction)**2 return torch.sqrt((tot_error / len(self.d_obs)))
[docs] def train_RMSE(self): """ Compute current error on training set. Returns ------- float RMSE on training set. """ return torch.sqrt(torch.mean( (self.d_obs - self.mu_post_d)**2))
[docs] def condition_number(self, cov_pushfwd, F, sigma0): """ Compute condition number of the inversion matrix R. Used to detect numerical instability. Parameters ---------- cov_pushfwd: tensor Pushforwarded covariance matrix, i.e. K F^T F: tensor Forward operator sigma0: float Standard deviation parameter for the kernel. Returns ------- float Condition number of the inversion matrix R. """ # Noise to Variance ratio. NtV = (self.data_std / sigma0)**2 inv_inversion_operator = torch.add( NtV * self.data_ones, torch.mm(F, cov_pushfwd)) return np.linalg.cond(inv_inversion_operator.detach().numpy())
[docs] def get_inversion_op_cholesky(self, K_d, sigma0): """ Compute the Cholesky decomposition of the inverse of the inversion operator. Increases noise level if necessary to make matrix invertible. Note that this method updates the noise level if necessary. Parameters ---------- K_d: Tensor The (pushforwarded from model) data covariance matrix (stripped from sigma0). sigma0: Tensor The (model side) standard deviation. Returns ------- Tensor L such that R = LL^t. L is lower triangular. """ self.R = (self.data_std**2) * self.data_ones + sigma0**2 * K_d # Try to Cholesky. for attempt in range(50): try: L = torch.cholesky(self.R) except RuntimeError: self.logger.info("Cholesky failed: Singular Matrix.") # Increase noise in steps of 5%. self.data_std += 0.05 * self.data_std self.R = (self.data_std**2) * self.data_ones + sigma0**2 * K_d self.logger.info("Increasing data std from original {} to {} and retrying.".format( self.data_std_orig, self.data_std)) else: return L # If didnt manage to invert. raise ValueError( "Impossible to invert matrix, even at noise std {}".format(self.data_std)) return -1
[docs] def inv_op_vector_mult(self, x): """ Multiply a vector by the inversion operator, using Cholesky approach (numerically more stable than computing inverse). Parameters ---------- x: Tensor The vector to multiply. Returns ------- Tensor Multiplied vector R^(-1) * x. """ z, _ = torch.triangular_solve(x, self.inv_op_L, upper=False) y, _ = torch.triangular_solve(z, self.inv_op_L.t(), upper=True) return y