gpr
Gaussian Process Regression tools (gpr)
This module was developed by Elias Dalan in spring 2022.
Kernel
Kernel class
covariance_function: The function that will be used to calculate the covariance between our datasets
K(self, X1, X2)
Function that returns the covariance matrix given our datasets
X1: Dataset 1 (Often the training set)
X2: Dataset 2 (Often the target set)
self.covariance_function(X1, X2) : covariance matrix given our datasets X1 and X2.
Source code in btjenesten/gpr.py
def K(self, X1, X2):
"""
Function that returns the covariance matrix given our datasets
Parameters:
-----------
X1: Dataset 1 (Often the training set)
X2: Dataset 2 (Often the target set)
Returns:
----------
self.covariance_function(X1, X2) : covariance matrix given our datasets X1 and X2.
"""
if np.isscalar(X1):
X1 = np.array([X1])
if np.isscalar(X2):
X2 = np.array([X2])
return self.covariance_function(X1, X2)
Regressor
Gaussian process regressor class
kernel: Specifies the type of covarince function we want for our regressor. If none is provided the default is the radial basis function
training_data_X: Training data inputs, also called features
training_data_Y: Training data outputs, also called labels
aquisition(self, minimize_prediction=True, x0=None, l=1.2, delta=0.1)
Returns the point at which our model function is predicted to have the highest value.
minimize_prediction: If your task is to minimize some model function, this parameter is True. If your task is to maximize the model function this parameter is False.
l: Exploration parameter. Scales how much the standard deviation should impact the function value. l = 1 means that the function maximized/minimized equals predicted value +/- the standard deviation.
x0: Initial guess. If not specified it will use the point at which the training data is the largest/smallest.
delta: Hyperparameter that tunes UCB around measured datapoints.
p - The predicted point at which an evaluation would yeild the highest/lowest value
Source code in btjenesten/gpr.py
def aquisition(self, minimize_prediction=True, x0 = None, l=1.2, delta=0.1):
"""
Returns the point at which our model function is predicted to have the highest value.
Parameters:
-----------
minimize_prediction:
If your task is to minimize some model function, this parameter is True. If your task is to maximize the model function
this parameter is False.
l:
Exploration parameter. Scales how much the standard deviation should impact the function value. l = 1
means that the function maximized/minimized equals predicted value +/- the standard deviation.
x0:
Initial guess. If not specified it will use the point at which the training data is the largest/smallest.
delta:
Hyperparameter that tunes UCB around measured datapoints.
Returns:
--------
p - The predicted point at which an evaluation would yeild the highest/lowest value
"""
if minimize_prediction: #Minimization process
if x0 == None:
x0_index = np.where(self.training_data_Y == np.min(self.training_data_Y))
x0 = self.training_data_X[x0_index]
x0 = x0 + np.random.rand(len(x0)) * 0
objective_function = lambda x, predict = self.predict : predict(x)
std_x = lambda x, predict = self.predict : np.sqrt(np.abs(np.diag(predict(x, return_covariance = True)[1])))
objective_noise = lambda x, std = std_x : (1 - std(x))**2 * delta + std(x)
UCB = lambda x, exploit = objective_function, explore = objective_noise: exploit(x) + l*explore(x)
minimization = minimize(UCB, x0)
p = minimization.x
return p
else: #Maximization process
if x0 == None:
x0_index = np.where(self.training_data_Y == np.max(self.training_data_Y))
x0 = self.training_data_X[x0_index]
x0 = x0 + + np.random.rand(len(x0)) * 0
objective_function = lambda x, predict = self.predict : predict(x)
std_x = lambda x, predict = self.predict : np.sqrt(np.abs(np.diag(predict(x, return_covariance = True)[1])))
objective_noise = lambda x, std = std_x : (1 - std(x))**2 * delta + std(x)
UCB = lambda x, exploit = objective_function, explore = objective_noise : -1*(exploit(x) + l*explore(x))
minimization = minimize(UCB, x0)
p = minimization.x
return p
predict(self, input_data_X, training_data_X=None, training_data_Y=None, return_variance=False)
Predicts output values for some input data given a set of training data
input_data_X: Input features that the gpr will evaluate.
training_data_X: training data inputs.
training_data_Y: training data outputs.
return_variance: Returns variance for each prediction if this is true
predicted_y: Predicted output data given cooresponding input_data_X and a set of training data inputs and outputs (training_data_X, training_data_Y)
predicted_covariance_matrix: Predicted variance for each point of predicted output.
Source code in btjenesten/gpr.py
def predict(self, input_data_X, training_data_X = None, training_data_Y = None, return_variance = False):
"""
Predicts output values for some input data given a set of training data
Parameters:
-----------
input_data_X:
Input features that the gpr will evaluate.
training_data_X:
training data inputs.
training_data_Y:
training data outputs.
return_variance:
Returns variance for each prediction if this is true
Returns:
-----------
predicted_y:
Predicted output data given cooresponding input_data_X and a set of training data
inputs and outputs (training_data_X, training_data_Y)
predicted_covariance_matrix:
Predicted variance for each point of predicted output.
"""
#msg = "Input array likely contains single sample. Reshape your data using array.reshape(1, -1) if that is the case."
#assert input_data_X.ndim != 1 and len(input_data_X) != , msg
if training_data_X == None or training_data_Y == None:
K_11 = self.kernel.K(self.training_data_X, self.training_data_X)
K_12 = self.kernel.K(self.training_data_X, input_data_X)
K_21 = K_12.T
K_22 = self.kernel.K(input_data_X, input_data_X)
assert (np.linalg.det(K_11) != 0), "Singular matrix. Training data might have duplicates."
KT = np.linalg.solve(K_11, K_12).T
predicted_y = KT.dot(self.training_data_Y)
else:
K_11 = self.kernel.K(training_data_X, training_data_X)
K_12 = self.kernel.K(training_data_X, input_data_X)
K_21 = self.kernel.K(input_data_X, training_data_X)
K_22 = self.kernel.K(input_data_X, input_data_X)
assert (np.linalg.det(K_11) != 0), "Singular matrix. Training data might have duplicates."
KT = np.linalg.solve(K_11, K_12).T
predicted_y = KT.dot(training_data_Y)
predicted_y = predicted_y.ravel()
if return_variance:
predicted_variance = np.diag(K_22 - KT @ K_12)
y_var_negative = predicted_variance < 0
if np.any(y_var_negative):
predicted_variance.setflags(write="True")
print("Some variance values were negative. Setting them to zero.")
predicted_variance[y_var_negative] = 0
return predicted_y, predicted_variance
else:
return predicted_y
score(self, input_data_X, input_data_Y)
Returns the average and maximum error of our predict method.
input_data_X: input data that the gpr will predict corresponding output data to.
input_data_Y: Corresponding true ouput data for input_data_X.
avg_error - the average error between the predicted values and the true values max_error - the maximum error between the predicted values and the true values
Source code in btjenesten/gpr.py
def score(self, input_data_X, input_data_Y):
"""
Returns the average and maximum error of our predict method.
Parameters:
-----------
input_data_X:
input data that the gpr will predict corresponding output data to.
input_data_Y:
Corresponding true ouput data for input_data_X.
Returns:
--------
avg_error - the average error between the predicted values and the true values
max_error - the maximum error between the predicted values and the true values
"""
predicted_y = self.predict(input_data_X)
avg_error = np.mean(np.abs(predicted_y - input_data_Y))
max_error = np.max(np.abs(predicted_y - input_data_Y))
return avg_error
update(self, new_X, new_Y, tol=1e-06)
Updates the training data in accordance to some newly measured data.
new_X: Set of new features that have been measured.
new_Y: Corresponding set of labels to new_X.
tol: Tolerance which the training data set can differ from new points. If this is too low you may encounter singular covariance matrices.
Source code in btjenesten/gpr.py
def update(self, new_X, new_Y, tol=1e-6):
"""
Updates the training data in accordance to some newly measured data.
Parameters:
-----------
new_X:
Set of new features that have been measured.
new_Y:
Corresponding set of labels to new_X.
tol:
Tolerance which the training data set can differ from new points. If this is too low you may encounter singular
covariance matrices.
"""
for measurement in new_X:
for i in range(len(self.training_data_X)):
if (np.abs(measurement - self.training_data_X[i]) < tol).all():
print(f"The model has most likely converged! {measurement} already exists in the training set.")
return
old_X_shape = self.training_data_X.shape
old_Y_shape = len(self.training_data_Y)
new_X_shape = np.array(self.training_data_X.shape)
new_Y_shape = np.array(self.training_data_Y.shape)
new_X_shape[0] += new_X.shape[0]
new_Y_shape += len(new_Y)
new_training_data_X = np.zeros(new_X_shape)
new_training_data_Y = np.zeros(new_Y_shape)
new_training_data_X[:-new_X.shape[0]] = self.training_data_X
new_training_data_X[-new_X.shape[0]:] = new_X
new_training_data_Y[:-len(new_Y)] = self.training_data_Y
new_training_data_Y[-len(new_Y):] = new_Y
indexes = np.argsort(new_training_data_X)
self.training_data_X = new_training_data_X[indexes]
self.training_data_Y = new_training_data_Y[indexes]
Constant(X1, X2, k=0.5)
Kernel that returns a constant covariance value between \(x_i\) and \(x_j\) Useful if all values depend on eachother equally.
X1: Dataset 1
X2: Dataset 2
k: constant that determines the covariance.
A matrix with the same shape as our input data. All matrix elements have the value k.
Source code in btjenesten/kernels.py
def Constant(X1, X2, k = 0.5):
"""
Kernel that returns a constant covariance value between $x_i$ and $x_j$
Useful if all values depend on eachother equally.
Parameters:
-----------
X1: Dataset 1
X2: Dataset 2
k: constant that determines the covariance.
Returns:
A matrix with the same shape as our input data. All matrix elements have the value k.
"""
return np.ones(X1.shape)*k
Funny_trigonometric(X1, X2, k=1)
Kernel that I made only for fun. May work for extravagant datasets
X1: Dataset 1
X2: Dataset 2
k: constant that determines the frequency of the trigonometric functions.
A covariance matrix that might be a bit crazy.
Source code in btjenesten/kernels.py
def Funny_trigonometric(X1, X2, k = 1):
"""
Kernel that I made only for fun. May work for extravagant datasets
Parameters:
-----------
X1: Dataset 1
X2: Dataset 2
k: constant that determines the frequency of the trigonometric functions.
Returns:
A covariance matrix that might be a bit crazy.
"""
return np.sin(-k*(x[:, None] - y[None,:])**2) - np.cos(-k*(x[:, None] - y[None,:])**2)
RBF(X1, X2, l=1)
Radial basis function of the form: $$ e^{-l * d(X_i,X_j)} $$
X1: Dataset 1
X2: Dataset 2
l: Length scale parameter. Can be adjusted to adjust the covariance between \(x_i\) and \(x_j\). Increasing l will decrease the covariance, and vice verca.
A matrix with the same shape as our input data, where the elemets are: \(e^{-l*d(x_i, x_j)}\) where \(d(x_i, x_j)\) is the difference between element \(x_i\) in X1 and element \(x_j\) in X2.
Source code in btjenesten/kernels.py
def RBF(X1, X2, l = 1):
"""
Radial basis function of the form:
$$
e^{-l * d(X_i,X_j)}
$$
Parameters:
-----------
X1: Dataset 1
X2: Dataset 2
l: Length scale parameter.
Can be adjusted to adjust the covariance between $x_i$ and $x_j$.
Increasing l will decrease the covariance, and vice verca.
Returns:
-----------
A matrix with the same shape as our input data, where the elemets are:
$e^{-l*d(x_i, x_j)}$ where $d(x_i, x_j)$ is the difference between element $x_i$ in X1
and element $x_j$ in X2.
"""
d = np.sum((X1.reshape(X1.shape[0],-1)[:, None] - X2.reshape(X2.shape[0],-1)[None,])**2, axis = 2)
return np.exp(-l*d)