Source code for smac.model.gaussian_process.gaussian_process

from __future__ import annotations

from typing import Any, Optional, TypeVar, cast

import logging

import numpy as np
from ConfigSpace import ConfigurationSpace
from scipy import optimize
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Kernel

from smac.constants import VERY_SMALL_NUMBER
from smac.model.gaussian_process.abstract_gaussian_process import (
    AbstractGaussianProcess,
)
from smac.model.gaussian_process.priors.abstract_prior import AbstractPrior

__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"


logger = logging.getLogger(__name__)


Self = TypeVar("Self", bound="GaussianProcess")


[docs]class GaussianProcess(AbstractGaussianProcess): """Implementation of Gaussian process model. The Gaussian process hyperparameters are obtained by optimizing the marginal log likelihood. This code is based on the implementation of RoBO: Klein, A. and Falkner, S. and Mansur, N. and Hutter, F. RoBO: A Flexible and Robust Bayesian Optimization Framework in Python In: NIPS 2017 Bayesian Optimization Workshop Parameters ---------- configspace : ConfigurationSpace kernel : Kernel Kernel which is used for the Gaussian process. n_restarts : int, defaults to 10 Number of restarts for the Gaussian process hyperparameter optimization. normalize_y : bool, defaults to True Zero mean unit variance normalization of the output values. instance_features : dict[str, list[int | float]] | None, defaults to None Features (list of int or floats) of the instances (str). The features are incorporated into the X data, on which the model is trained on. pca_components : float, defaults to 7 Number of components to keep when using PCA to reduce dimensionality of instance features. seed : int """ def __init__( self, configspace: ConfigurationSpace, kernel: Kernel, n_restarts: int = 10, normalize_y: bool = True, instance_features: dict[str, list[int | float]] | None = None, pca_components: int | None = 7, seed: int = 0, ): super().__init__( configspace=configspace, seed=seed, kernel=kernel, instance_features=instance_features, pca_components=pca_components, ) self._normalize_y = normalize_y self._n_restarts = n_restarts # Internal variables self._hypers = np.empty((0,)) self._is_trained = False self._n_ll_evals = 0 self._set_has_conditions() @property def meta(self) -> dict[str, Any]: # noqa: D102 meta = super().meta meta.update({"n_restarts": self._n_restarts, "normalize_y": self._normalize_y}) return meta def _train( self: Self, X: np.ndarray, y: np.ndarray, optimize_hyperparameters: bool = True, ) -> Self: """Computes the Cholesky decomposition of the covariance of X and estimates the GP hyperparameters by optimizing the marginal loglikelihood. The prior mean of the GP is set to the empirical mean of X. Parameters ---------- X : np.ndarray [#samples, #hyperparameter + #features] Input data points. Y : np.ndarray [#samples, #objectives] The corresponding target values. optimize_hyperparameters: boolean If set to true the hyperparameters are optimized otherwise the default hyperparameters of the kernel are used. """ if self._normalize_y: y = self._normalize(y) X = self._impute_inactive(X) y = y.flatten() n_tries = 10 for i in range(n_tries): try: self._gp = self._get_gaussian_process() self._gp.fit(X, y) break except np.linalg.LinAlgError as e: if i == n_tries: raise e # Assume that the last entry of theta is the noise theta = np.exp(self._kernel.theta) theta[-1] += 1 self._kernel.theta = np.log(theta) if optimize_hyperparameters: self._all_priors = self._get_all_priors(add_bound_priors=False) self._hypers = self._optimize() self._gp.kernel.theta = self._hypers self._gp.fit(X, y) else: self._hypers = self._gp.kernel.theta # Set the flag self._is_trained = True return self def _get_gaussian_process(self) -> GaussianProcessRegressor: return GaussianProcessRegressor( kernel=self._kernel, normalize_y=False, # We do not use scikit-learn's normalize routine optimizer=None, n_restarts_optimizer=-1, # We do not use scikit-learn's optimization routine alpha=0, # Governed by the kernel random_state=self._rng, ) def _nll(self, theta: np.ndarray) -> tuple[float, np.ndarray]: """Returns the negative marginal log likelihood (+ the prior) for a hyperparameter configuration theta. Negative because we use scipy minimize for optimization. Parameters ---------- theta : np.ndarray Hyperparameter vector. Note that all hyperparameter are on a log scale. """ self._n_ll_evals += 1 try: lml, grad = self._gp.log_marginal_likelihood(theta, eval_gradient=True) except np.linalg.LinAlgError: return 1e25, np.zeros(theta.shape) for dim, priors in enumerate(self._all_priors): for prior in priors: lml += prior.get_log_probability(theta[dim]) grad[dim] += prior.get_gradient(theta[dim]) # We add a minus here because scipy is minimizing if not np.isfinite(lml).all() or not np.all(np.isfinite(grad)): return 1e25, np.zeros(theta.shape) else: return -lml, -grad def _optimize(self) -> np.ndarray: """Optimizes the marginal log likelihood and returns the best found hyperparameter configuration theta. Returns ------- theta : np.ndarray Hyperparameter vector that maximizes the marginal log likelihood. """ log_bounds = [(b[0], b[1]) for b in self._gp.kernel.bounds] # Start optimization from the previous hyperparameter configuration p0 = [self._gp.kernel.theta] if self._n_restarts > 0: dim_samples = [] prior: list[AbstractPrior] | AbstractPrior | None = None for dim, hp_bound in enumerate(log_bounds): prior = self._all_priors[dim] # Always sample from the first prior if isinstance(prior, list): if len(prior) == 0: prior = None else: prior = prior[0] prior = cast(Optional[AbstractPrior], prior) if prior is None: try: sample = self._rng.uniform( low=hp_bound[0], high=hp_bound[1], size=(self._n_restarts,), ) except OverflowError: raise ValueError("OverflowError while sampling from (%f, %f)" % (hp_bound[0], hp_bound[1])) dim_samples.append(sample.flatten()) else: dim_samples.append(prior.sample_from_prior(self._n_restarts).flatten()) p0 += list(np.vstack(dim_samples).transpose()) theta_star: np.ndarray | None = None f_opt_star = np.inf for i, start_point in enumerate(p0): theta, f_opt, _ = optimize.fmin_l_bfgs_b(self._nll, start_point, bounds=log_bounds) if f_opt < f_opt_star: f_opt_star = f_opt theta_star = theta if theta_star is None: raise RuntimeError return theta_star def _predict( self, X: np.ndarray, covariance_type: str | None = "diagonal", ) -> tuple[np.ndarray, np.ndarray | None]: if not self._is_trained: raise Exception("Model has to be trained first!") X_test = self._impute_inactive(X) if covariance_type is None: mu = self._gp.predict(X_test) var = None if self._normalize_y: mu = self._untransform_y(mu) else: predict_kwargs = {"return_cov": False, "return_std": True} if covariance_type == "full": predict_kwargs = {"return_cov": True, "return_std": False} mu, var = self._gp.predict(X_test, **predict_kwargs) if covariance_type != "full": var = var**2 # Since we get standard deviation for faster computation # Clip negative variances and set them to the smallest # positive float value var = np.clip(var, VERY_SMALL_NUMBER, np.inf) if self._normalize_y: mu, var = self._untransform_y(mu, var) if covariance_type == "std": var = np.sqrt(var) # Converting variance to std deviation if specified return mu, var
[docs] def sample_functions(self, X_test: np.ndarray, n_funcs: int = 1) -> np.ndarray: """Samples F function values from the current posterior at the N specified test points. Parameters ---------- X : np.ndarray [#samples, #hyperparameter + #features] Input data points. n_funcs: int Number of function values that are drawn at each test point. Returns ------- function_samples : np.array The F function values drawn at the N test points. """ if not self._is_trained: raise Exception("Model has to be trained first.") X_test = self._impute_inactive(X_test) funcs = self._gp.sample_y(X_test, n_samples=n_funcs, random_state=self._rng) if self._normalize_y: funcs = self._untransform_y(funcs) if len(funcs.shape) == 1: return funcs[None, :] else: return funcs