Source code for smac.epm.gaussian_process.gp

from typing import List, Optional, Tuple, Union, cast

import logging

import numpy as np
from scipy import optimize
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Kernel

from smac.configspace import ConfigurationSpace
from smac.epm.gaussian_process import BaseModel
from smac.epm.gaussian_process.utils.prior import Prior
from smac.utils.constants import VERY_SMALL_NUMBER

__copyright__ = "Copyright 2021, AutoML.org Freiburg-Hannover"
__license__ = "3-clause BSD"


logger = logging.getLogger(__name__)


[docs]class GaussianProcess(BaseModel): """Gaussian process model. The GP hyperparameterŝ are obtained by optimizing the marginal log likelihood. This code is based on the implementation of RoBO: Klein, A. and Falkner, S. and Mansur, N. and Hutter, F. RoBO: A Flexible and Robust Bayesian Optimization Framework in Python In: NIPS 2017 Bayesian Optimization Workshop Parameters ---------- types : List[int] Specifies the number of categorical values of an input dimension where the i-th entry corresponds to the i-th input dimension. Let's say we have 2 dimension where the first dimension consists of 3 different categorical choices and the second dimension is continuous than we have to pass [3, 0]. Note that we count starting from 0. bounds : List[Tuple[float, float]] bounds of input dimensions: (lower, uppper) for continuous dims; (n_cat, np.nan) for categorical dims seed : int Model seed. kernel : george kernel object Specifies the kernel that is used for all Gaussian Process prior : prior object Defines a prior for the hyperparameters of the GP. Make sure that it implements the Prior interface. normalize_y : bool Zero mean unit variance normalization of the output values n_opt_restart : int Number of restarts for GP hyperparameter optimization instance_features : np.ndarray (I, K) Contains the K dimensional instance features of the I different instances pca_components : float Number of components to keep when using PCA to reduce dimensionality of instance features. Requires to set n_feats (> pca_dims). """ def __init__( self, configspace: ConfigurationSpace, types: List[int], bounds: List[Tuple[float, float]], seed: int, kernel: Kernel, normalize_y: bool = True, n_opt_restarts: int = 10, instance_features: Optional[np.ndarray] = None, pca_components: Optional[int] = None, ): super().__init__( configspace=configspace, types=types, bounds=bounds, seed=seed, kernel=kernel, instance_features=instance_features, pca_components=pca_components, ) self.normalize_y = normalize_y self.n_opt_restarts = n_opt_restarts self.hypers = np.empty((0,)) self.is_trained = False self._n_ll_evals = 0 self._set_has_conditions() def _train(self, X: np.ndarray, y: np.ndarray, do_optimize: bool = True) -> "GaussianProcess": """Computes the Cholesky decomposition of the covariance of X and estimates the GP hyperparameters by optimizing the marginal loglikelihood. The prior mean of the GP is set to the empirical mean of X. Parameters ---------- X: np.ndarray (N, D) Input data points. The dimensionality of X is (N, D), with N as the number of points and D is the number of features. y: np.ndarray (N,) The corresponding target values. do_optimize: boolean If set to true the hyperparameters are optimized otherwise the default hyperparameters of the kernel are used. """ X = self._impute_inactive(X) if self.normalize_y: y = self._normalize_y(y) y = y.flatten() n_tries = 10 for i in range(n_tries): try: self.gp = self._get_gp() self.gp.fit(X, y) break except np.linalg.LinAlgError as e: if i == n_tries: raise e # Assume that the last entry of theta is the noise theta = np.exp(self.kernel.theta) theta[-1] += 1 self.kernel.theta = np.log(theta) if do_optimize: self._all_priors = self._get_all_priors(add_bound_priors=False) self.hypers = self._optimize() self.gp.kernel.theta = self.hypers self.gp.fit(X, y) else: self.hypers = self.gp.kernel.theta self.is_trained = True return self def _get_gp(self) -> GaussianProcessRegressor: return GaussianProcessRegressor( kernel=self.kernel, normalize_y=False, optimizer=None, n_restarts_optimizer=-1, # Do not use scikit-learn's optimization routine alpha=0, # Governed by the kernel random_state=self.rng, ) def _nll(self, theta: np.ndarray) -> Tuple[float, np.ndarray]: """Returns the negative marginal log likelihood (+ the prior) for a hyperparameter configuration theta. (negative because we use scipy minimize for optimization) Parameters ---------- theta : np.ndarray(H) Hyperparameter vector. Note that all hyperparameter are on a log scale. Returns ------- float lnlikelihood + prior """ self._n_ll_evals += 1 try: lml, grad = self.gp.log_marginal_likelihood(theta, eval_gradient=True) except np.linalg.LinAlgError: return 1e25, np.zeros(theta.shape) for dim, priors in enumerate(self._all_priors): for prior in priors: lml += prior.lnprob(theta[dim]) grad[dim] += prior.gradient(theta[dim]) # We add a minus here because scipy is minimizing if not np.isfinite(lml).all() or not np.all(np.isfinite(grad)): return 1e25, np.zeros(theta.shape) else: return -lml, -grad def _optimize(self) -> np.ndarray: """Optimizes the marginal log likelihood and returns the best found hyperparameter configuration theta. Returns ------- theta : np.ndarray(H) Hyperparameter vector that maximizes the marginal log likelihood """ log_bounds = [(b[0], b[1]) for b in self.gp.kernel.bounds] # Start optimization from the previous hyperparameter configuration p0 = [self.gp.kernel.theta] if self.n_opt_restarts > 0: dim_samples = [] prior = None # type: Optional[Union[List[Prior], Prior]] for dim, hp_bound in enumerate(log_bounds): prior = self._all_priors[dim] # Always sample from the first prior if isinstance(prior, list): if len(prior) == 0: prior = None else: prior = prior[0] prior = cast(Optional[Prior], prior) if prior is None: try: sample = self.rng.uniform( low=hp_bound[0], high=hp_bound[1], size=(self.n_opt_restarts,), ) except OverflowError: raise ValueError("OverflowError while sampling from (%f, %f)" % (hp_bound[0], hp_bound[1])) dim_samples.append(sample.flatten()) else: dim_samples.append(prior.sample_from_prior(self.n_opt_restarts).flatten()) p0 += list(np.vstack(dim_samples).transpose()) theta_star: Optional[np.ndarray] = None f_opt_star = np.inf for i, start_point in enumerate(p0): theta, f_opt, _ = optimize.fmin_l_bfgs_b(self._nll, start_point, bounds=log_bounds) if f_opt < f_opt_star: f_opt_star = f_opt theta_star = theta if theta_star is None: raise RuntimeError return theta_star def _predict( self, X_test: np.ndarray, cov_return_type: Optional[str] = "diagonal_cov" ) -> Tuple[np.ndarray, Optional[np.ndarray]]: r""" Returns the predictive mean and variance of the objective function at the given test points. Parameters ---------- X_test: np.ndarray (N, D) Input test points cov_return_type: Optional[str] Specifies what to return along with the mean. Refer ``predict()`` for more information. Returns ------- np.array(N,) predictive mean np.array(N,) or np.array(N, N) or None predictive variance or standard deviation """ if not self.is_trained: raise Exception("Model has to be trained first!") X_test = self._impute_inactive(X_test) if cov_return_type is None: mu = self.gp.predict(X_test) var = None if self.normalize_y: mu = self._untransform_y(mu) else: predict_kwargs = {"return_cov": False, "return_std": True} if cov_return_type == "full_cov": predict_kwargs = {"return_cov": True, "return_std": False} mu, var = self.gp.predict(X_test, **predict_kwargs) if cov_return_type != "full_cov": var = var**2 # since we get standard deviation for faster computation # Clip negative variances and set them to the smallest # positive float value var = np.clip(var, VERY_SMALL_NUMBER, np.inf) if self.normalize_y: mu, var = self._untransform_y(mu, var) if cov_return_type == "diagonal_std": var = np.sqrt(var) # converting variance to std deviation if specified return mu, var
[docs] def sample_functions(self, X_test: np.ndarray, n_funcs: int = 1) -> np.ndarray: """Samples F function values from the current posterior at the N specified test points. Parameters ---------- X_test: np.ndarray (N, D) Input test points n_funcs: int Number of function values that are drawn at each test point. Returns ------- function_samples: np.array(N, F) The F function values drawn at the N test points. """ if not self.is_trained: raise Exception("Model has to be trained first!") X_test = self._impute_inactive(X_test) funcs = self.gp.sample_y(X_test, n_samples=n_funcs, random_state=self.rng) if self.normalize_y: funcs = self._untransform_y(funcs) if len(funcs.shape) == 1: return funcs[None, :] else: return funcs