from __future__ import annotations
from typing import Any, Optional, TypeVar, cast
import logging
import numpy as np
from ConfigSpace import ConfigurationSpace
from scipy import optimize
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Kernel
from smac.constants import VERY_SMALL_NUMBER
from smac.model.gaussian_process.abstract_gaussian_process import (
AbstractGaussianProcess,
)
from smac.model.gaussian_process.priors.abstract_prior import AbstractPrior
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = logging.getLogger(__name__)
Self = TypeVar("Self", bound="GaussianProcess")
[docs]class GaussianProcess(AbstractGaussianProcess):
"""Implementation of Gaussian process model. The Gaussian process hyperparameters are obtained by optimizing
the marginal log likelihood.
This code is based on the implementation of RoBO:
Klein, A. and Falkner, S. and Mansur, N. and Hutter, F.
RoBO: A Flexible and Robust Bayesian Optimization Framework in Python
In: NIPS 2017 Bayesian Optimization Workshop
Parameters
----------
configspace : ConfigurationSpace
kernel : Kernel
Kernel which is used for the Gaussian process.
n_restarts : int, defaults to 10
Number of restarts for the Gaussian process hyperparameter optimization.
normalize_y : bool, defaults to True
Zero mean unit variance normalization of the output values.
instance_features : dict[str, list[int | float]] | None, defaults to None
Features (list of int or floats) of the instances (str). The features are incorporated into the X data,
on which the model is trained on.
pca_components : float, defaults to 7
Number of components to keep when using PCA to reduce dimensionality of instance features.
seed : int
"""
def __init__(
self,
configspace: ConfigurationSpace,
kernel: Kernel,
n_restarts: int = 10,
normalize_y: bool = True,
instance_features: dict[str, list[int | float]] | None = None,
pca_components: int | None = 7,
seed: int = 0,
):
super().__init__(
configspace=configspace,
seed=seed,
kernel=kernel,
instance_features=instance_features,
pca_components=pca_components,
)
self._normalize_y = normalize_y
self._n_restarts = n_restarts
# Internal variables
self._hypers = np.empty((0,))
self._is_trained = False
self._n_ll_evals = 0
self._set_has_conditions()
@property
def meta(self) -> dict[str, Any]: # noqa: D102
meta = super().meta
meta.update({"n_restarts": self._n_restarts, "normalize_y": self._normalize_y})
return meta
def _train(
self: Self,
X: np.ndarray,
y: np.ndarray,
optimize_hyperparameters: bool = True,
) -> Self:
"""Computes the Cholesky decomposition of the covariance of X and estimates the GP
hyperparameters by optimizing the marginal loglikelihood. The prior mean of the GP is set to
the empirical mean of X.
Parameters
----------
X : np.ndarray [#samples, #hyperparameter + #features]
Input data points.
Y : np.ndarray [#samples, #objectives]
The corresponding target values.
optimize_hyperparameters: boolean
If set to true the hyperparameters are optimized otherwise the default hyperparameters of the kernel are
used.
"""
if self._normalize_y:
y = self._normalize(y)
X = self._impute_inactive(X)
y = y.flatten()
n_tries = 10
for i in range(n_tries):
try:
self._gp = self._get_gaussian_process()
self._gp.fit(X, y)
break
except np.linalg.LinAlgError as e:
if i == n_tries:
raise e
# Assume that the last entry of theta is the noise
theta = np.exp(self._kernel.theta)
theta[-1] += 1
self._kernel.theta = np.log(theta)
if optimize_hyperparameters:
self._all_priors = self._get_all_priors(add_bound_priors=False)
self._hypers = self._optimize()
self._gp.kernel.theta = self._hypers
self._gp.fit(X, y)
else:
self._hypers = self._gp.kernel.theta
# Set the flag
self._is_trained = True
return self
def _get_gaussian_process(self) -> GaussianProcessRegressor:
return GaussianProcessRegressor(
kernel=self._kernel,
normalize_y=False, # We do not use scikit-learn's normalize routine
optimizer=None,
n_restarts_optimizer=-1, # We do not use scikit-learn's optimization routine
alpha=0, # Governed by the kernel
random_state=self._rng,
)
def _nll(self, theta: np.ndarray) -> tuple[float, np.ndarray]:
"""Returns the negative marginal log likelihood (+ the prior) for a hyperparameter
configuration theta. Negative because we use scipy minimize for optimization.
Parameters
----------
theta : np.ndarray
Hyperparameter vector. Note that all hyperparameter are on a log scale.
"""
self._n_ll_evals += 1
try:
lml, grad = self._gp.log_marginal_likelihood(theta, eval_gradient=True)
except np.linalg.LinAlgError:
return 1e25, np.zeros(theta.shape)
for dim, priors in enumerate(self._all_priors):
for prior in priors:
lml += prior.get_log_probability(theta[dim])
grad[dim] += prior.get_gradient(theta[dim])
# We add a minus here because scipy is minimizing
if not np.isfinite(lml).all() or not np.all(np.isfinite(grad)):
return 1e25, np.zeros(theta.shape)
else:
return -lml, -grad
def _optimize(self) -> np.ndarray:
"""Optimizes the marginal log likelihood and returns the best found hyperparameter
configuration theta.
Returns
-------
theta : np.ndarray
Hyperparameter vector that maximizes the marginal log likelihood.
"""
log_bounds = [(b[0], b[1]) for b in self._gp.kernel.bounds]
# Start optimization from the previous hyperparameter configuration
p0 = [self._gp.kernel.theta]
if self._n_restarts > 0:
dim_samples = []
prior: list[AbstractPrior] | AbstractPrior | None = None
for dim, hp_bound in enumerate(log_bounds):
prior = self._all_priors[dim]
# Always sample from the first prior
if isinstance(prior, list):
if len(prior) == 0:
prior = None
else:
prior = prior[0]
prior = cast(Optional[AbstractPrior], prior)
if prior is None:
try:
sample = self._rng.uniform(
low=hp_bound[0],
high=hp_bound[1],
size=(self._n_restarts,),
)
except OverflowError:
raise ValueError("OverflowError while sampling from (%f, %f)" % (hp_bound[0], hp_bound[1]))
dim_samples.append(sample.flatten())
else:
dim_samples.append(prior.sample_from_prior(self._n_restarts).flatten())
p0 += list(np.vstack(dim_samples).transpose())
theta_star: np.ndarray | None = None
f_opt_star = np.inf
for i, start_point in enumerate(p0):
theta, f_opt, _ = optimize.fmin_l_bfgs_b(self._nll, start_point, bounds=log_bounds)
if f_opt < f_opt_star:
f_opt_star = f_opt
theta_star = theta
if theta_star is None:
raise RuntimeError
return theta_star
def _predict(
self,
X: np.ndarray,
covariance_type: str | None = "diagonal",
) -> tuple[np.ndarray, np.ndarray | None]:
if not self._is_trained:
raise Exception("Model has to be trained first!")
X_test = self._impute_inactive(X)
if covariance_type is None:
mu = self._gp.predict(X_test)
var = None
if self._normalize_y:
mu = self._untransform_y(mu)
else:
predict_kwargs = {"return_cov": False, "return_std": True}
if covariance_type == "full":
predict_kwargs = {"return_cov": True, "return_std": False}
mu, var = self._gp.predict(X_test, **predict_kwargs)
if covariance_type != "full":
var = var**2 # Since we get standard deviation for faster computation
# Clip negative variances and set them to the smallest
# positive float value
var = np.clip(var, VERY_SMALL_NUMBER, np.inf)
if self._normalize_y:
mu, var = self._untransform_y(mu, var)
if covariance_type == "std":
var = np.sqrt(var) # Converting variance to std deviation if specified
return mu, var
[docs] def sample_functions(self, X_test: np.ndarray, n_funcs: int = 1) -> np.ndarray:
"""Samples F function values from the current posterior at the N specified test points.
Parameters
----------
X : np.ndarray [#samples, #hyperparameter + #features]
Input data points.
n_funcs: int
Number of function values that are drawn at each test point.
Returns
-------
function_samples : np.array
The F function values drawn at the N test points.
"""
if not self._is_trained:
raise Exception("Model has to be trained first.")
X_test = self._impute_inactive(X_test)
funcs = self._gp.sample_y(X_test, n_samples=n_funcs, random_state=self._rng)
if self._normalize_y:
funcs = self._untransform_y(funcs)
if len(funcs.shape) == 1:
return funcs[None, :]
else:
return funcs