Source code for smac.acquisition.function.expected_improvement

from __future__ import annotations

from typing import Any

import numpy as np
from scipy.stats import norm

from smac.acquisition.function.abstract_acquisition_function import (
    AbstractAcquisitionFunction,
)
from smac.utils.logging import get_logger

__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"

logger = get_logger(__name__)


[docs]class EI(AbstractAcquisitionFunction): r"""Expected Improvement (with or without function values in log space) acquisition function :math:`EI(X) := \mathbb{E}\left[ \max\{0, f(\mathbf{X^+}) - f_{t+1}(\mathbf{X}) - \xi \} \right]`, with :math:`f(X^+)` as the best location. Parameters ---------- xi : float, defaults to 0.0 Controls the balance between exploration and exploitation of the acquisition function. log : bool, defaults to False Whether the function values are in log-space. Attributes ---------- _xi : float Exploration-exloitation trade-off parameter. _log: bool Function values in log-space or not. _eta : float Current incumbent function value (best value observed so far). """ def __init__( self, xi: float = 0.0, log: bool = False, ) -> None: super(EI, self).__init__() self._xi: float = xi self._log: bool = log self._eta: float | None = None @property def name(self) -> str: # noqa: D102 return "Expected Improvement" @property def meta(self) -> dict[str, Any]: # noqa: D102 meta = super().meta meta.update( { "xi": self._xi, "log": self._log, } ) return meta def _update(self, **kwargs: Any) -> None: """Update acsquisition function attributes Parameters ---------- eta : float Function value of current incumbent. xi : float, optional Exploration-exploitation trade-off parameter """ assert "eta" in kwargs self._eta = kwargs["eta"] if "xi" in kwargs and kwargs["xi"] is not None: self._xi = kwargs["xi"] def _compute(self, X: np.ndarray) -> np.ndarray: """Compute EI acquisition value Parameters ---------- X : np.ndarray [N, D] The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray [N,1] Acquisition function values wrt X. Raises ------ ValueError If `update` has not been called before (current incumbent value `eta` unspecified). ValueError If EI is < 0 for at least one sample (normal function value space). ValueError If EI is < 0 for at least one sample (log function value space). """ assert self._model is not None assert self._xi is not None if self._eta is None: raise ValueError( "No current best specified. Call update(" "eta=<int>) to inform the acquisition function " "about the current best value." ) if not self._log: if len(X.shape) == 1: X = X[:, np.newaxis] m, v = self._model.predict_marginalized(X) s = np.sqrt(v) def calculate_f() -> np.ndarray: z = (self._eta - m - self._xi) / s return (self._eta - m - self._xi) * norm.cdf(z) + s * norm.pdf(z) if np.any(s == 0.0): # if std is zero, we have observed x on all instances # using a RF, std should be never exactly 0.0 # Avoid zero division by setting all zeros in s to one. # Consider the corresponding results in f to be zero. logger.warning("Predicted std is 0.0 for at least one sample.") s_copy = np.copy(s) s[s_copy == 0.0] = 1.0 f = calculate_f() f[s_copy == 0.0] = 0.0 else: f = calculate_f() if (f < 0).any(): raise ValueError("Expected Improvement is smaller than 0 for at least one " "sample.") return f else: if len(X.shape) == 1: X = X[:, np.newaxis] m, var_ = self._model.predict_marginalized(X) std = np.sqrt(var_) def calculate_log_ei() -> np.ndarray: # we expect that f_min is in log-space assert self._eta is not None assert self._xi is not None f_min = self._eta - self._xi v = (f_min - m) / std return (np.exp(f_min) * norm.cdf(v)) - (np.exp(0.5 * var_ + m) * norm.cdf(v - std)) if np.any(std == 0.0): # if std is zero, we have observed x on all instances # using a RF, std should be never exactly 0.0 # Avoid zero division by setting all zeros in s to one. # Consider the corresponding results in f to be zero. logger.warning("Predicted std is 0.0 for at least one sample.") std_copy = np.copy(std) std[std_copy == 0.0] = 1.0 log_ei = calculate_log_ei() log_ei[std_copy == 0.0] = 0.0 else: log_ei = calculate_log_ei() if (log_ei < 0).any(): raise ValueError("Expected Improvement is smaller than 0 for at least one sample.") return log_ei.reshape((-1, 1))
[docs]class EIPS(EI): r"""Expected Improvement per Second acquisition function :math:`EI(X) := \frac{\mathbb{E}\left[\max\{0,f(\mathbf{X^+})-f_{t+1}(\mathbf{X})-\xi\right]\}]}{np.log(r(x))}`, with :math:`f(X^+)` as the best location and :math:`r(x)` as runtime. Parameters ---------- xi : float, defaults to 0.0 Controls the balance between exploration and exploitation of the acquisition function. """ def __init__(self, xi: float = 0.0) -> None: super(EIPS, self).__init__(xi=xi) @property def name(self) -> str: # noqa: D102 return "Expected Improvement per Second" def _compute(self, X: np.ndarray) -> np.ndarray: """Compute EI per second acquisition value Parameters ---------- X : np.ndarray [N, D] The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D), with N as the number of points to evaluate at and D is the number of dimensions of one X. Returns ------- np.ndarray [N,1] Acquisition function values wrt X. Raises ------ ValueError If the mean has the wrong shape, should have shape (-1, 2). ValueError If the variance has the wrong shape, should have shape (-1, 2). ValueError If `update` has not been called before (current incumbent value `eta` unspecified). ValueError If EIPS is < 0 for at least one sample. """ assert self._model is not None if len(X.shape) == 1: X = X[:, np.newaxis] m, v = self._model.predict_marginalized(X) if m.shape[1] != 2: raise ValueError(f"m has wrong shape: {m.shape} != (-1, 2)") if v.shape[1] != 2: raise ValueError(f"v has wrong shape: {v.shape} != (-1, 2)") m_cost = m[:, 0] v_cost = v[:, 0] # The model already predicts log(runtime) m_runtime = m[:, 1] s = np.sqrt(v_cost) if self._eta is None: raise ValueError( "No current best specified. Call update(" "eta=<int>) to inform the acquisition function " "about the current best value." ) def calculate_f() -> np.ndarray: z = (self._eta - m_cost - self._xi) / s f = (self._eta - m_cost - self._xi) * norm.cdf(z) + s * norm.pdf(z) f = f / m_runtime return f if np.any(s == 0.0): # if std is zero, we have observed x on all instances # using a RF, std should be never exactly 0.0 # Avoid zero division by setting all zeros in s to one. # Consider the corresponding results in f to be zero. logger.warning("Predicted std is 0.0 for at least one sample.") s_copy = np.copy(s) s[s_copy == 0.0] = 1.0 f = calculate_f() f[s_copy == 0.0] = 0.0 else: f = calculate_f() if (f < 0).any(): raise ValueError("Expected Improvement per Second is smaller than 0 " "for at least one sample.") return f.reshape((-1, 1))