"""
This module provides a class encapsulating all data needed to define an inverse
problem.
The goal is to allow the user to ignore the underlying details of the input
data.
Indeed, if one want to work on Niklas's data, and directly load all
data from the raw .mat file, one just has to run:
:code:`InverseProblem.from_matfile(path)`
"""
from volcapy import loading
import volcapy.math.matrix_tools as mat
import volcapy.grid.covariance_tools as cl
import numpy as np
import os
from math import floor
from timeit import default_timer as timer
# Unused, just there to remind of the value for Niklas's data.
sigma_d = 0.1
# LOAD DATA
data_path = "/home/cedric/PHD/Dev/Volcano/data/Cedric.mat"
[docs]class InverseProblem():
""" Numerical implementation of an inverse problem.
Needs a forward and an inversion grid as
input. Then can perform several inversions.
Attributes
----------
cells_coords: 2D array
Coordinates of the inversion cells. Element i,j should contain j-th
coordinate of inversion cell nr i. Note the order of the cells should
reflect the order of the columns of the forward.
forward: 2D array
Forward operator.
data_points
data_values
"""
def __init__(self, cells_coords, forward, data_points, data_values):
self.cells_coords = cells_coords
self.forward = forward
self.data_points = data_points
self.data_values = data_values
# Dimensions
self.n_model = cells_coords.shape[0]
self.n_data = forward.shape[0]
[docs] @classmethod
def from_matfile(cls, path):
""" Read forward, inversion grid and data from a matlab file.
Parameters
----------
path: string
Path to a matlab file containing the data. The file should have the
same format as the original file from Niklas (see documentation).
"""
data = loading.load_niklas(path)
cells_coords = data['coords']
# TODO: Maybe refactor loading so that directly returns an array
# instead of a list.
# Long run, should think about unifying datatypes.
data_coords = np.array(data['data_coords'])
forward = data['F']
data_values = data['d']
return cls(cells_coords, forward, data_coords, data_values)
[docs] def subset(self, n_cells):
""" Subset an inverse problem by only keeping n_cells
inversion cells selected at random.
This is used to make the problem more tractable and
allow prototyping calculations on a small computer.
The effect of this method is to directly modify the attributes of the
class to adapt them to the smaller problem.
Parameters
----------
n_cells: int
Only keep the first n_cells.
"""
# Pick indices at random.
inds = np.random.random_integers(0, self.n_model - 1, size=(n_cells))
self.cells_coords = self.cells_coords[inds, :]
self.forward = self.forward[:, inds]
# We subsetted columns, hence we have to restore C-contiguity by hand.
self.forward = np.ascontiguousarray(self.forward)
self.cells_coords = np.asfortranarray(self.cells_coords)
# Dimensions
self.n_model = self.cells_coords.shape[0]
[docs] def subset_data(self, n_keep, seed=None):
""" Subset an inverse problem by only keeping n_keep data
points selected at random.
The effect of this method is to directly modify the attributes of the
class to adapt them to the smaller problem.
Parameters
----------
n_keep: int
Only keep n_keep data points.
seed: int
Can be used to make choice of test data predictable.
Returns
-------
(rest_forward, rest_data)
"""
# Pick indices at random.
if seed is not None:
np.random.seed(seed)
inds = np.random.choice(range(self.n_data), n_keep, replace=False)
# Return the unused part of the forward and of the data so that it can
# be used as test set.
rest_forward = np.delete(self.forward, inds, axis=0)
rest_data = np.delete(self.data_values, inds, axis=0)
rest_data_points = np.delete(self.data_points, inds, axis=0)
# Now subset
self.forward = self.forward[inds, :]
self.data_points = self.data_points[inds, :]
self.data_values = self.data_values[inds]
# We subsetted columns, hence we have to restore C-contiguity by hand.
self.forward = np.ascontiguousarray(self.forward)
# New dimensions
self.n_data = self.forward.shape[0]
# Note that we return data in a column vector, to make it
# compatible with the rest of the data.
return (rest_forward, rest_data[:, None])
[docs] def build_partial_covariance(self, row_begin, row_end, sigma0, lambda0):
""" (DEPRECATED) Prepare a function for returning partial rows of the covariance
matrix.
Warning: should cast, since returns MemoryView.
"""
n_rows = row_end - row_begin + 1
print(row_begin)
dist_mesh = cl.compute_mesh_squared_euclidean_distance(
self.cells_coords[row_begin:row_end+1, 0],
self.cells_coords[:, 0],
self.cells_coords[row_begin:row_end+1, 1],
self.cells_coords[:, 1],
self.cells_coords[row_begin:row_end+1, 2],
self.cells_coords[:, 2])
dist_mesh = sigma0**2 * np.exp(- 1 / (2 * lambda0**2) * dist_mesh)
return dist_mesh
# TODO: Refactor. Effectively, this is chunked multiplication of a matrix with
# an implicitly defined one.
[docs] def compute_covariance_pushforward(self, G, sigma0, lambda0):
""" (DEPRECATED) Compute the matrix product C_m * G^T, which we call the *covariance
pushforward*.
Parameters
----------
G: 2-D array
The forward operator.
"""
# Allocate memory for output array.
n_data = G.shape[0]
out = np.zeros((self.n_model, n_data), dtype=np.float32)
# Store the transpose once and for all.
GT = (G.T).astype(
dtype=np.float32, order='C', copy=False)
# Create the list of chunks.
chunk_size = 2048
chunks = mat.chunk_range(self.n_model, chunk_size)
# Loop in chunks.
for row_begin, row_end in chunks:
print(row_begin)
start = timer()
# Get corresponding part of covariance matrix.
partial_cov = self.build_partial_covariance(row_begin, row_end,
sigma0, lambda0)
mid = timer()
# Append to result.
out[row_begin:row_end + 1, :] = partial_cov @ GT
end = timer()
print(
"Building done in "
+ str(mid - start)
+ " and multiplication done in " + str(end - mid))
return out
# TODO: Factor out some methods.
[docs] def inverse(self, out_folder, prior_mean, sigma_d,
sigma0, lambda0,
preload_covariance_pushforward=False, cov_pushforward=None,
compute_post_covariance=False):
""" (DEPRECATED) Perform inversion.
Parameters
----------
out_folder: string
Path to a folder where we will save the resutls.
compute_post_covariance: Boolean
If set to True, then will compute and store the full posterior
covariance matrix (huge).
"""
# We will save a lot of stuff, so place it in a dedicated directory..
os.chdir(out_folder)
# Build prior mean.
# The order parameters are statically compiled in fast covariance.
m_prior = np.full(self.n_model, prior_mean, dtype=np.float32)
# Build data covariance matrix.
cov_d = sigma_d**2 * np.eye(self.n_data, dtype=np.float32)
# If the covariance pushforward hasnt been precomputed.
if not preload_covariance_pushforward:
# Compute big matrix product and save.
print("Computing big matrix product.")
start = timer()
self.covariance_pushforward = self.compute_covariance_pushforward(
self.forward, sigma0, lambda0)
end = timer()
print("Done in " + str(end - start))
np.save('Cm_Gt.npy', self.covariance_pushforward)
else:
self.covariance_pushforward = cov_pushforward
# Use to perform inversion and save.
print("Inverting matrix.")
start = timer()
temp = self.forward @ self.covariance_pushforward
inverse = np.linalg.inv(temp + cov_d)
end = timer()
print("Done in " + str(end - start))
print("Computing posterior mean.")
start = timer()
m_posterior = (m_prior
+ self.covariance_pushforward
[docs] @ inverse @ (self.data_values - self.forward @ m_prior))
self.m_posterior = m_posterior
end = timer()
print("Done in " + str(end - start))
np.save('m_posterior.npy', m_posterior)
# ----------------------------------------------
# Build and save the diagonal of the posterior covariance matrix.
# ----------------------------------------------
print("Computing posterior variance.")
start = timer()
Cm_post = np.empty([self.n_model],
dtype=self.covariance_pushforward.dtype)
#TODO: Find a name for these and store them cleanly.
A = self.covariance_pushforward @ inverse
B = self.covariance_pushforward.T
# Diagonal is just given by the scalar product of the two factors.
for i in range(self.n_model):
Cm_post[i] = np.dot(A[i, :], B[:, i])
end = timer()
print("Done in " + str(end - start))
# Save the square root standard deviation).
"""
np.save(
"posterior_cov_diag.npy",
np.sqrt(np.array([sigma0] * self.n_model) - Cm_post))
"""
print("DONE")
if compute_post_covariance:
self.compute_post_covariance()
# TODO: Refactor. Currently accessing stuf oustide its scope.
def compute_post_covariance(self):
""" (DEPRECATED) Compute the full posterior covariance matrix.
"""
# AMBITIOUS: Compute the whole (120GB) posterior covariance matrix.
print("Computing posterior covariance.")
post_cov = np.memmap('post_cov.npy', dtype='float32', mode='w+',
shape=(self.n_model, self.n_model))
# Compute the matrix product line by line.
# TODO: Could compute several lines at a time, by chunks.
for i in range(self.n_model):
print(i)
prior_cov = self.build_partial_covariance(i, i)
post_cov[i, :] = prior_cov - A[i, :] @ B
# Flush to disk.
del post_cov
[docs] def save_squared_distance_mesh(self, path):
""" (DEPRECATED) Computes the matrix of squared euclidean distances betwen all
couples and saves to disk.
"""
# AMBITIOUS: Compute the whole (120GB) posterior covariance matrix.
print("Computing posterior covariance.")
post_cov = np.memmap('post_cov.npy', dtype='float32', mode='w+',
shape=(self.n_model, self.n_model))
# Compute the matrix product line by line.
# TODO: Could compute several lines at a time, by chunks.
for i in range(self.n_model):
print(i)
prior_cov = self.build_partial_covariance(i, i)
post_cov[i, :] = prior_cov - A[i, :] @ B
# Flush to disk.
del post_cov