Source code for smac.optimizer.acquisition

# encoding=utf8
import abc
from typing import Any, List, Tuple

import copy

import numpy as np
from ConfigSpace.hyperparameters import FloatHyperparameter
from scipy.stats import norm

from smac.configspace import Configuration
from smac.configspace.util import convert_configurations_to_array
from smac.epm.base_epm import BaseEPM
from smac.utils.logging import PickableLoggerAdapter

__author__ = "Aaron Klein, Marius Lindauer"
__copyright__ = "Copyright 2017, ML4AAD"
__license__ = "3-clause BSD"


[docs]class AbstractAcquisitionFunction(object, metaclass=abc.ABCMeta): """Abstract base class for acquisition function. Parameters ---------- model : BaseEPM Models the objective function. Attributes ---------- model logger """ def __init__(self, model: BaseEPM): self.model = model self._required_updates = ("model",) # type: Tuple[str, ...] self.logger = PickableLoggerAdapter(self.__module__ + "." + self.__class__.__name__)
[docs] def update(self, **kwargs: Any) -> None: """Update the acquisition function attributes required for calculation. This method will be called after fitting the model, but before maximizing the acquisition function. As an examples, EI uses it to update the current fmin. The default implementation only updates the attributes of the acqusition function which are already present. Parameters ---------- kwargs """ for key in self._required_updates: if key not in kwargs: raise ValueError( "Acquisition function %s needs to be updated with key %s, but only got " "keys %s." % (self.__class__.__name__, key, list(kwargs.keys())) ) for key in kwargs: if key in self._required_updates: setattr(self, key, kwargs[key])
[docs] def __call__(self, configurations: List[Configuration]) -> np.ndarray: """Computes the acquisition value for a given X. Parameters ---------- configurations : list The configurations where the acquisition function should be evaluated. Returns ------- np.ndarray(N, 1) acquisition values for X """ X = convert_configurations_to_array(configurations) if len(X.shape) == 1: X = X[np.newaxis, :] acq = self._compute(X) if np.any(np.isnan(acq)): idx = np.where(np.isnan(acq))[0] acq[idx, :] = -np.finfo(float).max return acq
@abc.abstractmethod def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the acquisition value for a given point X. This function has to be overwritten in a derived class. Parameters ---------- X : np.ndarray The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray(N,1) Acquisition function values wrt X """ raise NotImplementedError()
[docs]class IntegratedAcquisitionFunction(AbstractAcquisitionFunction): r"""Marginalize over Model hyperparameters to compute the integrated acquisition function. See "Practical Bayesian Optimization of Machine Learning Algorithms" by Jasper Snoek et al. (https://papers.nips.cc/paper/4522-practical-bayesian-optimization-of-machine-learning-algorithms.pdf) for further details. """ def __init__(self, model: BaseEPM, acquisition_function: AbstractAcquisitionFunction, **kwargs: Any): """Constructor. Parameters ---------- model : BaseEPM The model needs to implement an additional attribute ``models`` which contains the different models to integrate over. kwargs Additional keyword arguments """ super().__init__(model) self.long_name = "Integrated Acquisition Function (%s)" % acquisition_function.__class__.__name__ self.acq = acquisition_function self._functions = [] # type: List[AbstractAcquisitionFunction] self.eta = None
[docs] def update(self, **kwargs: Any) -> None: """Update the acquisition functions values. This method will be called if the model is updated. E.g. entropy search uses it to update its approximation of P(x=x_min), EI uses it to update the current fmin. This implementation creates an acquisition function object for each model to integrate over and sets the respective attributes for each acquisition function object. Parameters ---------- model : BaseEPM The model needs to implement an additional attribute ``models`` which contains the different models to integrate over. kwargs """ model = kwargs["model"] del kwargs["model"] if not hasattr(model, "models") or len(model.models) == 0: raise ValueError("IntegratedAcquisitionFunction requires at least one model to integrate!") if len(self._functions) == 0 or len(self._functions) != len(model.models): self._functions = [copy.deepcopy(self.acq) for _ in model.models] for submodel, func in zip(model.models, self._functions): func.update(model=submodel, **kwargs)
def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the EI value and its derivatives. Parameters ---------- X: np.ndarray(N, D), The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray(N,1) Expected Improvement of X """ if self._functions is None: raise ValueError("Need to call update first!") return np.array([func._compute(X) for func in self._functions]).mean(axis=0)
[docs]class PriorAcquisitionFunction(AbstractAcquisitionFunction): r"""Weight the acquisition function with a user-defined prior over the optimum. See "PiBO: Augmenting Acquisition Functions with User Beliefs for Bayesian Optimization" by Carl Hvarfner et al. (###nolinkyet###) for further details. """ def __init__( self, model: BaseEPM, acquisition_function: AbstractAcquisitionFunction, decay_beta: float, prior_floor: float = 1e-12, discretize: bool = False, discrete_bins_factor: float = 10.0, **kwargs: Any, ): """Constructor Parameters ---------- model : BaseEPM Models the objective function. decay_beta: Decay factor on the user prior - defaults to n_iterations / 10 if not specifed otherwise. prior_floor : Lowest possible value of the prior, to ensure non-negativity for all values in the search space. discretize : Whether to discretize (bin) the densities for continous parameters. Triggered for Random Forest models and continous hyperparameters to avoid a pathological case where all Random Forest randomness is removed (RF surrogates require piecewise constant acquisition functions to be well-behaved) discrete_bins_factor : If discretizing, the multiple on the number of allowed bins for each parameter kwargs Additional keyword arguments """ super().__init__(model) self.long_name = "Prior Acquisition Function (%s)" % acquisition_function.__class__.__name__ self.acq = acquisition_function self._functions = [] # type: List[AbstractAcquisitionFunction] self.eta = None self.hyperparameters = self.model.get_configspace().get_hyperparameters_dict() self.decay_beta = decay_beta self.prior_floor = prior_floor self.discretize = discretize if self.discretize: self.discrete_bins_factor = discrete_bins_factor # check if the acquisition function is LCB or TS - then the acquisition function values # need to be rescaled to assure positiveness & correct magnitude if isinstance(self.acq, IntegratedAcquisitionFunction): acquisition_type = self.acq.acq else: acquisition_type = self.acq self.rescale_acq = isinstance(acquisition_type, (LCB, TS)) self.iteration_number = 0
[docs] def update(self, **kwargs: Any) -> None: """Update the acquisition function attributes required for calculation. Updates the model, the accompanying acquisition function and tracks the iteration number. Parameters ---------- kwargs Additional keyword arguments """ self.iteration_number += 1 self.acq.update(**kwargs) self.eta = kwargs.get("eta")
def _compute_prior(self, X: np.ndarray) -> np.ndarray: """Computes the prior-weighted acquisition function values, where the prior on each parameter is multiplied by a decay factor controlled by the parameter decay_beta and the iteration number. Multivariate priors are not supported, for now. Parameters ---------- X: np.ndarray(N, D), The input points where the user-specified prior should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray(N,1) The user prior over the optimum for values of X """ prior_values = np.ones((len(X), 1)) # iterate over the hyperparmeters (alphabetically sorted) and the columns, which come # in the same order for parameter, X_col in zip(self.hyperparameters.values(), X.T): if self.discretize and isinstance(parameter, FloatHyperparameter): number_of_bins = int(np.ceil(self.discrete_bins_factor * self.decay_beta / self.iteration_number)) prior_values *= self._compute_discretized_pdf(parameter, X_col, number_of_bins) + self.prior_floor else: prior_values *= parameter._pdf(X_col[:, np.newaxis]) return prior_values def _compute_discretized_pdf( self, parameter: FloatHyperparameter, X_col: np.ndarray, number_of_bins: int ) -> np.ndarray: """Discretizes (bins) prior values on continous a specific continous parameter to an increasingly coarse discretization determined by the prior decay parameter. Parameters ---------- parameter: a FloatHyperparameter that, due to using a random forest surrogate, must have its prior discretized X_col: np.ndarray(N, ), The input points where the acquisition function should be evaluated. The dimensionality of X is (N, ), with N as the number of points to evaluate for the specific hyperparameter number_of_bins: The number of unique values allowed on the discretized version of the pdf. Returns ------- np.ndarray(N,1) The user prior over the optimum for the parameter at hand. """ # evaluates the actual pdf on all the relevant points pdf_values = parameter._pdf(X_col[:, np.newaxis]) # retrieves the largest value of the pdf in the domain lower, upper = (0, parameter.get_max_density()) # creates the bins (the possible discrete options of the pdf) bin_values = np.linspace(lower, upper, number_of_bins) # generates an index (bin) for each evaluated point bin_indices = np.clip( np.round((pdf_values - lower) * number_of_bins / (upper - lower)), 0, number_of_bins - 1 ).astype(int) # gets the actual value for each point prior_values = bin_values[bin_indices] return prior_values def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the prior-weighted acquisition function values, where the prior on each parameter is multiplied by a decay factor controlled by the parameter decay_beta and the iteration number. Multivariate priors are not supported, for now. Parameters ---------- X: np.ndarray(N, D), The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray(N,1) Prior-weighted acquisition function values of X """ if self.rescale_acq: # for TS and UCB, we need to scale the function values to not run into issues # of negative values or issues of varying magnitudes (here, they are both) # negative by design and just flipping the sign leads to picking the worst point) acq_values = np.clip(self.acq._compute(X) + self.eta, 0, np.inf) else: acq_values = self.acq._compute(X) prior_values = self._compute_prior(X) + self.prior_floor decayed_prior_values = np.power(prior_values, self.decay_beta / self.iteration_number) return acq_values * decayed_prior_values
[docs]class EI(AbstractAcquisitionFunction): r"""Computes for a given x the expected improvement as acquisition value. :math:`EI(X) := \mathbb{E}\left[ \max\{0, f(\mathbf{X^+}) - f_{t+1}(\mathbf{X}) - \xi \} \right]`, with :math:`f(X^+)` as the best location. """ def __init__(self, model: BaseEPM, par: float = 0.0): """Constructor. Parameters ---------- model : BaseEPM A model that implements at least - predict_marginalized_over_instances(X) par : float, default=0.0 Controls the balance between exploration and exploitation of the acquisition function. """ super(EI, self).__init__(model) self.long_name = "Expected Improvement" self.par = par self.eta = None self._required_updates = ("model", "eta") def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the EI value and its derivatives. Parameters ---------- X: np.ndarray(N, D), The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray(N,1) Expected Improvement of X """ if len(X.shape) == 1: X = X[:, np.newaxis] m, v = self.model.predict_marginalized_over_instances(X) s = np.sqrt(v) if self.eta is None: raise ValueError( "No current best specified. Call update(" "eta=<int>) to inform the acquisition function " "about the current best value." ) def calculate_f(): z = (self.eta - m - self.par) / s return (self.eta - m - self.par) * norm.cdf(z) + s * norm.pdf(z) if np.any(s == 0.0): # if std is zero, we have observed x on all instances # using a RF, std should be never exactly 0.0 # Avoid zero division by setting all zeros in s to one. # Consider the corresponding results in f to be zero. self.logger.warning("Predicted std is 0.0 for at least one sample.") s_copy = np.copy(s) s[s_copy == 0.0] = 1.0 f = calculate_f() f[s_copy == 0.0] = 0.0 else: f = calculate_f() if (f < 0).any(): raise ValueError("Expected Improvement is smaller than 0 for at least one " "sample.") return f
[docs]class EIPS(EI): def __init__(self, model: BaseEPM, par: float = 0.0): r"""Computes for a given x the expected improvement as acquisition value. :math:`EI(X) := \frac{\mathbb{E}\left[\max\{0,f(\mathbf{X^+})-f_{t+1}(\mathbf{X})-\xi\right]\}]}{np.log(r(x))}`, with :math:`f(X^+)` as the best location and :math:`r(x)` as runtime. Parameters ---------- model : BaseEPM A model that implements at least - predict_marginalized_over_instances(X) returning a tuples of predicted cost and running time par : float, default=0.0 Controls the balance between exploration and exploitation of the acquisition function. """ super(EIPS, self).__init__(model, par=par) self.long_name = "Expected Improvement per Second" def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the EIPS value. Parameters ---------- X: np.ndarray(N, D), The input point where the acquisition function should be evaluate. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray(N,1) Expected Improvement per Second of X """ if len(X.shape) == 1: X = X[:, np.newaxis] m, v = self.model.predict_marginalized_over_instances(X) if m.shape[1] != 2: raise ValueError("m has wrong shape: %s != (-1, 2)" % str(m.shape)) if v.shape[1] != 2: raise ValueError("v has wrong shape: %s != (-1, 2)" % str(v.shape)) m_cost = m[:, 0] v_cost = v[:, 0] # The model already predicts log(runtime) m_runtime = m[:, 1] s = np.sqrt(v_cost) if self.eta is None: raise ValueError( "No current best specified. Call update(" "eta=<int>) to inform the acquisition function " "about the current best value." ) def calculate_f(): z = (self.eta - m_cost - self.par) / s f = (self.eta - m_cost - self.par) * norm.cdf(z) + s * norm.pdf(z) f = f / m_runtime return f if np.any(s == 0.0): # if std is zero, we have observed x on all instances # using a RF, std should be never exactly 0.0 # Avoid zero division by setting all zeros in s to one. # Consider the corresponding results in f to be zero. self.logger.warning("Predicted std is 0.0 for at least one sample.") s_copy = np.copy(s) s[s_copy == 0.0] = 1.0 f = calculate_f() f[s_copy == 0.0] = 0.0 else: f = calculate_f() if (f < 0).any(): raise ValueError("Expected Improvement per Second is smaller than 0 " "for at least one sample.") return f.reshape((-1, 1))
[docs]class LogEI(AbstractAcquisitionFunction): def __init__(self, model: BaseEPM, par: float = 0.0): r"""Computes for a given x the logarithm expected improvement as acquisition value. Parameters ---------- model : BaseEPM A model that implements at least - predict_marginalized_over_instances(X) par : float, default=0.0 Controls the balance between exploration and exploitation of the acquisition function. """ super(LogEI, self).__init__(model) self.long_name = "Expected Improvement" self.par = par self.eta = None self._required_updates = ("model", "eta") def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the EI value and its derivatives. Parameters ---------- X: np.ndarray(N, D), The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray(N,1) Expected Improvement of X """ if self.eta is None: raise ValueError( "No current best specified. Call update(" "eta=<int>) to inform the acquisition function " "about the current best value." ) if len(X.shape) == 1: X = X[:, np.newaxis] m, var_ = self.model.predict_marginalized_over_instances(X) std = np.sqrt(var_) def calculate_log_ei(): # we expect that f_min is in log-space f_min = self.eta - self.par v = (f_min - m) / std return (np.exp(f_min) * norm.cdf(v)) - (np.exp(0.5 * var_ + m) * norm.cdf(v - std)) if np.any(std == 0.0): # if std is zero, we have observed x on all instances # using a RF, std should be never exactly 0.0 # Avoid zero division by setting all zeros in s to one. # Consider the corresponding results in f to be zero. self.logger.warning("Predicted std is 0.0 for at least one sample.") std_copy = np.copy(std) std[std_copy == 0.0] = 1.0 log_ei = calculate_log_ei() log_ei[std_copy == 0.0] = 0.0 else: log_ei = calculate_log_ei() if (log_ei < 0).any(): raise ValueError("Expected Improvement is smaller than 0 for at least one sample.") return log_ei.reshape((-1, 1))
[docs]class PI(AbstractAcquisitionFunction): def __init__(self, model: BaseEPM, par: float = 0.0): r"""Computes the probability of improvement for a given x over the best so far value as acquisition value. :math:`P(f_{t+1}(\mathbf{X})\geq f(\mathbf{X^+}))` :math:`:= \Phi(\\frac{ \mu(\mathbf{X})-f(\mathbf{X^+}) } { \sigma(\mathbf{X}) })` with :math:`f(X^+)` as the incumbent and :math:`\Phi` the cdf of the standard normal Parameters ---------- model : BaseEPM A model that implements at least - predict_marginalized_over_instances(X) par : float, default=0.0 Controls the balance between exploration and exploitation of the acquisition function. """ super(PI, self).__init__(model) self.long_name = "Probability of Improvement" self.par = par self.eta = None self._required_updates = ("model", "eta") def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the PI value. Parameters ---------- X: np.ndarray(N, D) Points to evaluate PI. N is the number of points and D the dimension for the points Returns ------- np.ndarray(N,1) Expected Improvement of X """ if self.eta is None: raise ValueError( "No current best specified. Call update(" "eta=<float>) to inform the acquisition function " "about the current best value." ) if len(X.shape) == 1: X = X[:, np.newaxis] m, var_ = self.model.predict_marginalized_over_instances(X) std = np.sqrt(var_) return norm.cdf((self.eta - m - self.par) / std)
[docs]class LCB(AbstractAcquisitionFunction): def __init__(self, model: BaseEPM, par: float = 1.0): r"""Computes the lower confidence bound for a given x over the best so far value as acquisition value. :math:`LCB(X) = \mu(\mathbf{X}) - \sqrt(\beta_t)\sigma(\mathbf{X})` Returns -LCB(X) as the acquisition_function optimizer maximizes the acquisition value. Parameters ---------- model : BaseEPM A model that implements at least - predict_marginalized_over_instances(X) par : float, default=1.0 Controls the balance between exploration and exploitation of the acquisition function. """ super(LCB, self).__init__(model) self.long_name = "Lower Confidence Bound" self.par = par self.num_data = None self._required_updates = ("model", "num_data") def _compute(self, X: np.ndarray) -> np.ndarray: """Computes the LCB value. Parameters ---------- X: np.ndarray(N, D) Points to evaluate LCB. N is the number of points and D the dimension for the points Returns ------- np.ndarray(N,1) Expected Improvement of X """ if self.num_data is None: raise ValueError( "No current number of Datapoints specified. Call update(" "num_data=<int>) to inform the acquisition function " "about the number of datapoints." ) if len(X.shape) == 1: X = X[:, np.newaxis] m, var_ = self.model.predict_marginalized_over_instances(X) std = np.sqrt(var_) beta = 2 * np.log((X.shape[1] * self.num_data**2) / self.par) return -(m - np.sqrt(beta) * std)
[docs]class TS(AbstractAcquisitionFunction): def __init__(self, model: BaseEPM, par: float = 0.0): r"""Do a Thompson Sampling for a given x over the best so far value as acquisition value. Warning ------- Thompson Sampling can only be used together with smac.optimizer.ei_optimization.RandomSearch, please do not use smac.optimizer.ei_optimization.LocalAndSortedRandomSearch to optimize TS acquisition function! :math:`TS(X) ~ \mathcal{N}(\mu(\mathbf{X}),\sigma(\mathbf{X}))' Returns -TS(X) as the acquisition_function optimizer maximizes the acquisition value. Parameters ---------- model : BaseEPM A model that implements at least - predict_marginalized_over_instances(X) par : float, default=0.0 TS does not require par here, we only wants to make it consistent with other acquisition functions. """ super(TS, self).__init__(model) self.long_name = "Thompson Sampling" self.par = par self.num_data = None self._required_updates = ("model",) def _compute(self, X: np.ndarray) -> np.ndarray: """Sample a new value from a gaussian distribution whose mean and covariance values are given by model. Parameters ---------- X: np.ndarray(N, D) Points to be evaluated where we could sample a value. N is the number of points and D the dimension for the points Returns ------- np.ndarray(N,1) negative sample value of X """ if len(X.shape) == 1: X = X[:, np.newaxis] sample_function = getattr(self.model, "sample_functions", None) if callable(sample_function): return -sample_function(X, n_funcs=1) m, var_ = self.model.predict_marginalized_over_instances(X) rng = getattr(self.model, "rng", np.random.RandomState(self.model.seed)) m = m.flatten() var_ = np.diag(var_.flatten()) return -rng.multivariate_normal(m, var_, 1).T