Source code for smac.epm.gaussian_process.augmented

from typing import Dict, List, Optional, Tuple, Union

from collections import OrderedDict

import gpytorch
import numpy as np
import torch
from botorch.optim.numpy_converter import module_to_array, set_params_with_array
from botorch.optim.utils import _get_extra_mll_args
from gpytorch.constraints.constraints import Interval
from gpytorch.distributions import MultivariateNormal
from gpytorch.kernels import Kernel
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.means import ZeroMean
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.models import ExactGP
from gpytorch.utils.errors import NanError
from scipy import optimize
from scipy.stats.qmc import LatinHypercube

from smac.configspace import ConfigurationSpace
from smac.epm.gaussian_process.gpytorch import ExactGPModel, GPyTorchGaussianProcess
from smac.epm.gaussian_process.kernels.boing import FITCKernel, FITCMean
from smac.epm.utils import check_subspace_points

gpytorch.settings.debug.off()


[docs]class AugmentedLocalGaussianProcess(ExactGP): def __init__( self, X_in: torch.Tensor, y_in: torch.Tensor, X_out: torch.Tensor, y_out: torch.Tensor, likelihood: GaussianLikelihood, base_covar_kernel: Kernel, ): """ An Augmented Local GP, it is trained with the points inside a subregion while its prior is augemented by the points outside the subregion (global configurations) Parameters ---------- X_in: torch.Tensor (N_in, D), feature vector of the points inside the subregion y_in: torch.Tensor (N_in, 1), observation inside the subregion X_out: torch.Tensor (N_out, D), feature vector of the points outside the subregion y_out:torch.Tensor (N_out, 1), observation inside the subregion likelihood: GaussianLikelihood, likelihood of the GP (noise) base_covar_kernel: Kernel, Covariance Kernel """ X_in = X_in.unsqueeze(-1) if X_in.ndimension() == 1 else X_in X_out = X_out.unsqueeze(-1) if X_out.ndimension() == 1 else X_out assert X_in.shape[-1] == X_out.shape[-1] super(AugmentedLocalGaussianProcess, self).__init__(X_in, y_in, likelihood) self._mean_module = ZeroMean() self.base_covar = base_covar_kernel self.X_out = X_out self.y_out = y_out self.augmented = False
[docs] def set_augment_module(self, X_inducing: torch.Tensor) -> None: """ Set an augmentation module, which will be used later for inference Parameters ---------- X_inducing: torch.Tensor(N_inducing, D) inducing points, it needs to have the same number of dimensions as X_in """ X_inducing = X_inducing.unsqueeze(-1) if X_inducing.ndimension() == 1 else X_inducing # assert X_inducing.shape[-1] == self.X_out.shape[-1] self.covar_module = FITCKernel( self.base_covar, X_inducing=X_inducing, X_out=self.X_out, y_out=self.y_out, likelihood=self.likelihood ) self.mean_module = FITCMean(covar_module=self.covar_module) self.augmented = True
[docs] def forward(self, x: torch.Tensor) -> MultivariateNormal: """ Compute the prior values. If optimize_kernel_hps is set True in the training phases, this model degenerates to a vanilla GP model with ZeroMean and base_covar as covariance matrix. Otherwise, we apply partial sparse GP mean and kernels here. """ if not self.augmented: # we only optimize for kernel hyperparameters covar_x = self.base_covar(x) mean_x = self._mean_module(x) else: covar_x = self.covar_module(x) mean_x = self.mean_module(x) return MultivariateNormal(mean_x, covar_x)
[docs]class VariationalGaussianProcess(gpytorch.models.ApproximateGP): """ A variational GP to compute the position of the inducing points. We only optimize for the position of the continuous dimensions and keep the categorical dimensions constant. """ def __init__(self, kernel: Kernel, X_inducing: torch.Tensor): """ Initialize a Variational GP we set the lower bound and upper bounds of inducing points for numerical hyperparameters between 0 and 1, that is, we constrain the inducing points to lay inside the subregion. Parameters ---------- kernel: Kernel kernel of the variational GP, its hyperparameter needs to be fixed when it is by LGPGA X_inducing: torch.tensor (N_inducing, D) inducing points """ variational_distribution = gpytorch.variational.TrilNaturalVariationalDistribution(X_inducing.size(0)) variational_strategy = gpytorch.variational.VariationalStrategy( self, X_inducing, variational_distribution, learn_inducing_locations=True ) super(VariationalGaussianProcess, self).__init__(variational_strategy) self.mean_module = gpytorch.means.ZeroMean() self.covar_module = kernel shape_X_inducing = X_inducing.shape lower_X_inducing = torch.zeros([shape_X_inducing[-1]]).repeat(shape_X_inducing[0]) upper_X_inducing = torch.ones([shape_X_inducing[-1]]).repeat(shape_X_inducing[0]) self.variational_strategy.register_constraint( param_name="inducing_points", constraint=Interval(lower_X_inducing, upper_X_inducing, transform=None), ) self.double() for p_name, t in self.named_hyperparameters(): if p_name != "variational_strategy.inducing_points": t.requires_grad = False
[docs] def forward(self, x: torch.Tensor) -> MultivariateNormal: """ Pass the posterior mean and variance given input X Parameters ---------- x: torch.Tensor Input data Returns ------- """ mean_x = self.mean_module(x) covar_x = self.covar_module(x, cont_only=True) return MultivariateNormal(mean_x, covar_x)
[docs]class GloballyAugmentedLocalGaussianProcess(GPyTorchGaussianProcess): def __init__( self, configspace: ConfigurationSpace, types: List[int], bounds: List[Tuple[float, float]], bounds_cont: np.ndarray, bounds_cat: List[Tuple], seed: int, kernel: Kernel, num_inducing_points: int = 2, likelihood: Optional[GaussianLikelihood] = None, normalize_y: bool = True, n_opt_restarts: int = 10, instance_features: Optional[np.ndarray] = None, pca_components: Optional[int] = None, ): """ The GP hyperparameters are obtained by optimizing the marginal log-likelihood and optimized with botorch We train an LGPGA in two stages: In the first stage, we only train the kernel hyperparameter and thus deactivate the gradient w.r.t the position of the inducing points. In the second stage, we use the kernel hyperparameter acquired in the first stage to initialize a new variational Gaussian process and only optimize its inducing points' position with natural gradients. Finally, we update the position of the inducing points and use it for evaluation. Parameters ---------- bounds_cont: np.ndarray(N_cont, 2), bounds of the continuous hyperparameters, store as [[0,1] * N_cont] bounds_cat: List[Tuple], bounds of categorical hyperparameters kernel : gpytorch kernel object Specifies the kernel that is used for all Gaussian Process num_inducing_points: int Number of inducing points likelihood: Optional[GaussianLikelihood] Likelihood values normalize_y : bool Zero mean unit variance normalization of the output values when the model is a partial sparse GP model. """ super(GloballyAugmentedLocalGaussianProcess, self).__init__( configspace=configspace, types=types, bounds=bounds, seed=seed, kernel=kernel, likelihood=likelihood, normalize_y=normalize_y, n_opt_restarts=n_opt_restarts, instance_features=instance_features, pca_components=pca_components, ) self.cont_dims = np.where(np.array(types) == 0)[0] self.cat_dims = np.where(np.array(types) != 0)[0] self.bounds_cont = bounds_cont self.bounds_cat = bounds_cat self.num_inducing_points = num_inducing_points
[docs] def update_attribute(self, **kwargs: Dict) -> None: """We update the class attribute (for instance, number of inducing points)""" for key in kwargs: if not hasattr(self, key): raise AttributeError(f"{self.__class__.__name__} has no attribute named {key}") setattr(self, key, kwargs[key])
def _train( self, X: np.ndarray, y: np.ndarray, do_optimize: bool = True ) -> Union[AugmentedLocalGaussianProcess, GPyTorchGaussianProcess]: """ Update the hyperparameters of the partial sparse kernel. Depending on the number of inputs inside and outside the subregion, we initialize a PartialSparseGaussianProcess or a GaussianProcessGPyTorch Parameters ---------- X: np.ndarray (N, D) Input data points. The dimensionality of X is (N, D), with N as the number of points and D is the number of features., N = N_in + N_out y: np.ndarray (N,) The corresponding target values. do_optimize: boolean If set to true, the hyperparameters are optimized otherwise, the default hyperparameters of the kernel are used. """ X = self._impute_inactive(X) if len(y.shape) == 1: self.n_objectives_ = 1 else: self.n_objectives_ = y.shape[1] if self.n_objectives_ == 1: y = y.flatten() ss_data_indices = check_subspace_points( X, cont_dims=self.cont_dims, cat_dims=self.cat_dims, bounds_cont=self.bounds_cont, bounds_cat=self.bounds_cat, expand_bound=True, ) if np.sum(ss_data_indices) > np.shape(y)[0] - self.num_inducing_points: # we initialize a vanilla GaussianProcessGPyTorch if self.normalize_y: y = self._normalize_y(y) self.num_points = np.shape(y)[0] get_gp_kwargs = {"X_in": X, "y_in": y, "X_out": None, "y_out": None} else: # we initialize a PartialSparseGaussianProcess object X_in = X[ss_data_indices] y_in = y[ss_data_indices] X_out = X[~ss_data_indices] y_out = y[~ss_data_indices] self.num_points = np.shape(y_in)[0] if self.normalize_y: y_in = self._normalize_y(y_in) y_out = (y_out - self.mean_y_) / self.std_y_ get_gp_kwargs = {"X_in": X_in, "y_in": y_in, "X_out": X_out, "y_out": y_out} n_tries = 10 for i in range(n_tries): try: self.gp = self._get_gp(**get_gp_kwargs) break except Exception as e: if i == n_tries - 1: raise RuntimeError(f"Fails to initialize a GP model, {e}") if do_optimize: self.hypers = self._optimize() self.gp = set_params_with_array(self.gp, self.hypers, self.property_dict) if isinstance(self.gp.model, AugmentedLocalGaussianProcess): # we optimize the position of the inducing points and thus needs to deactivate the gradient of kernel # hyperparameters lhd = LatinHypercube(d=X.shape[-1], seed=self.rng.randint(0, 1000000)) inducing_points = torch.from_numpy(lhd.random(n=self.num_inducing_points)) kernel = self.gp.model.base_covar var_gp = VariationalGaussianProcess(kernel, X_inducing=inducing_points) X_out_ = torch.from_numpy(X_out) y_out_ = torch.from_numpy(y_out) variational_ngd_optimizer = gpytorch.optim.NGD( var_gp.variational_parameters(), num_data=y_out_.size(0), lr=0.1 ) var_gp.train() likelihood = GaussianLikelihood().double() likelihood.train() mll_func = gpytorch.mlls.PredictiveLogLikelihood var_mll = mll_func(likelihood, var_gp, num_data=y_out_.size(0)) for t in var_gp.variational_parameters(): t.requires_grad = False x0, property_dict, bounds = module_to_array(module=var_mll) for t in var_gp.variational_parameters(): t.requires_grad = True bounds = np.asarray(bounds).transpose().tolist() start_points = [x0] inducing_idx = 0 inducing_size = X_out.shape[-1] * self.num_inducing_points for p_name, attrs in property_dict.items(): if p_name != "model.variational_strategy.inducing_points": # Construct the new tensor if len(attrs.shape) == 0: # deal with scalar tensors inducing_idx = inducing_idx + 1 else: inducing_idx = inducing_idx + np.prod(attrs.shape) else: break while len(start_points) < 3: new_start_point = np.random.rand(*x0.shape) new_inducing_points = torch.from_numpy(lhd.random(n=self.num_inducing_points)).flatten() new_start_point[inducing_idx : inducing_idx + inducing_size] = new_inducing_points start_points.append(new_start_point) def sci_opi_wrapper( x: np.ndarray, mll: gpytorch.module, property_dict: Dict, train_inputs: torch.Tensor, train_targets: torch.Tensor, ) -> Tuple[float, np.ndarray]: """ A modification of from botorch.optim.utils._scipy_objective_and_grad, the key difference is that we do an additional natural gradient update before computing the gradient values Parameters ---------- x: np.ndarray optimizer input mll: gpytorch.module a gpytorch module whose hyperparameters are defined by x property_dict: Dict a dict describing how x is mapped to initialize mll train_inputs: torch.Tensor (N_input, D) input points of the GP model train_targets: torch.Tensor (N_input, 1) target value of the GP model Returns ---------- loss: np.ndarray loss value grad: np.ndarray gradient w.r.t. the inputs ---------- """ # A modification of from botorch.optim.utils._scipy_objective_and_grad: # https://botorch.org/api/_modules/botorch/optim/utils.html # The key difference is that we do an additional natural gradient update here variational_ngd_optimizer.zero_grad() mll = set_params_with_array(mll, x, property_dict) mll.zero_grad() try: # catch linear algebra errors in gpytorch output = mll.model(train_inputs) args = [output, train_targets] + _get_extra_mll_args(mll) loss = -mll(*args).sum() except RuntimeError as e: if isinstance(e, NanError) or "singular" in e.args[0]: return float("nan"), np.full_like(x, "nan") else: raise e # pragma: nocover loss.backward() variational_ngd_optimizer.step() param_dict = OrderedDict(mll.named_parameters()) grad = [] for p_name in property_dict: t = param_dict[p_name].grad if t is None: # this deals with parameters that do not affect the loss grad.append(np.zeros(property_dict[p_name].shape.numel())) else: grad.append(t.detach().view(-1).cpu().double().clone().numpy()) mll.zero_grad() return loss.item(), np.concatenate(grad) theta_star = x0 f_opt_star = np.inf for start_point in start_points: try: theta, f_opt, res_dict = optimize.fmin_l_bfgs_b( sci_opi_wrapper, start_point, args=(var_mll, property_dict, X_out_, y_out_), bounds=bounds, maxiter=50, ) if f_opt < f_opt_star: f_opt_star = f_opt theta_star = theta except Exception as e: self.logger.warning(f"An exception {e} occurs during the optimizaiton") start_idx = 0 # modification on botorch.optim.numpy_converter.set_params_with_array as we only need to extract the # positions of inducing points for p_name, attrs in property_dict.items(): if p_name != "model.variational_strategy.inducing_points": # Construct the new tensor if len(attrs.shape) == 0: # deal with scalar tensors start_idx = start_idx + 1 else: start_idx = start_idx + np.prod(attrs.shape) else: end_idx = start_idx + np.prod(attrs.shape) X_inducing = torch.tensor( theta_star[start_idx:end_idx], dtype=attrs.dtype, device=attrs.device ).view(*attrs.shape) break # set inducing points for covariance module here self.gp_model.set_augment_module(X_inducing) else: self.hypers, self.property_dict, _ = module_to_array(module=self.gp) self.is_trained = True return self def _get_gp( self, X_in: Optional[np.ndarray] = None, y_in: Optional[np.ndarray] = None, X_out: Optional[np.ndarray] = None, y_out: Optional[np.ndarray] = None, ) -> Optional[ExactMarginalLogLikelihood]: """ Construct a new GP model based on the inputs If both in and out are None: return an empty model If only in_x and in_y are given: return a vanilla GP model If in_x, in_y, out_x, out_y are given: return a partial sparse GP model. Parameters ---------- X_in: Optional[np.ndarray (N_in, D)] Input data points inside the subregion. The dimensionality of X_in is (N_in, D), with N_in as the number of points inside the subregion and D is the number of features. If it is not given, this function will return None to be compatible with the implementation of its parent class y_in: Optional[np.ndarray (N_in,)] The corresponding target values inside the subregion. X_out: Optional[np.ndarray (N_out, D). Input data points outside the subregion. The dimensionality of X_out is (N_out, D). If it is not given, this function will return a vanilla Gaussian Process y_out: Optional[np.ndarray (N_out)] The corresponding target values outside the subregion. Returns ------- mll: ExactMarginalLogLikelihood a gp module """ if X_in is None: return None X_in = torch.from_numpy(X_in) y_in = torch.from_numpy(y_in) if X_out is None: self.gp_model = ExactGPModel(X_in, y_in, likelihood=self.likelihood, base_covar_kernel=self.kernel).double() else: X_out = torch.from_numpy(X_out) y_out = torch.from_numpy(y_out) self.gp_model = AugmentedLocalGaussianProcess( X_in, y_in, X_out, y_out, likelihood=self.likelihood, base_covar_kernel=self.kernel # type:ignore ).double() mll = ExactMarginalLogLikelihood(self.likelihood, self.gp_model) mll.double() return mll