from __future__ import annotations
from typing import Any
import numpy as np
from scipy.stats import norm
from smac.acquisition.function.abstract_acquisition_function import (
AbstractAcquisitionFunction,
)
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]class EI(AbstractAcquisitionFunction):
r"""Expected Improvement (with or without function values in log space) acquisition function
:math:`EI(X) := \mathbb{E}\left[ \max\{0, f(\mathbf{X^+}) - f_{t+1}(\mathbf{X}) - \xi \} \right]`,
with :math:`f(X^+)` as the best location.
Parameters
----------
xi : float, defaults to 0.0
Controls the balance between exploration and exploitation of the
acquisition function.
log : bool, defaults to False
Whether the function values are in log-space.
Attributes
----------
_xi : float
Exploration-exloitation trade-off parameter.
_log: bool
Function values in log-space or not.
_eta : float
Current incumbent function value (best value observed so far).
"""
def __init__(
self,
xi: float = 0.0,
log: bool = False,
) -> None:
super(EI, self).__init__()
self._xi: float = xi
self._log: bool = log
self._eta: float | None = None
@property
def name(self) -> str: # noqa: D102
return "Expected Improvement"
@property
def meta(self) -> dict[str, Any]: # noqa: D102
meta = super().meta
meta.update(
{
"xi": self._xi,
"log": self._log,
}
)
return meta
def _update(self, **kwargs: Any) -> None:
"""Update acsquisition function attributes
Parameters
----------
eta : float
Function value of current incumbent.
xi : float, optional
Exploration-exploitation trade-off parameter
"""
assert "eta" in kwargs
self._eta = kwargs["eta"]
if "xi" in kwargs and kwargs["xi"] is not None:
self._xi = kwargs["xi"]
def _compute(self, X: np.ndarray) -> np.ndarray:
"""Compute EI acquisition value
Parameters
----------
X : np.ndarray [N, D]
The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D),
with N as the number of points to evaluate at and D is the number of dimensions of one X.
Returns
-------
np.ndarray [N,1]
Acquisition function values wrt X.
Raises
------
ValueError
If `update` has not been called before (current incumbent value `eta` unspecified).
ValueError
If EI is < 0 for at least one sample (normal function value space).
ValueError
If EI is < 0 for at least one sample (log function value space).
"""
assert self._model is not None
assert self._xi is not None
if self._eta is None:
raise ValueError(
"No current best specified. Call update("
"eta=<int>) to inform the acquisition function "
"about the current best value."
)
if not self._log:
if len(X.shape) == 1:
X = X[:, np.newaxis]
m, v = self._model.predict_marginalized(X)
s = np.sqrt(v)
def calculate_f() -> np.ndarray:
z = (self._eta - m - self._xi) / s
return (self._eta - m - self._xi) * norm.cdf(z) + s * norm.pdf(z)
if np.any(s == 0.0):
# if std is zero, we have observed x on all instances
# using a RF, std should be never exactly 0.0
# Avoid zero division by setting all zeros in s to one.
# Consider the corresponding results in f to be zero.
logger.warning("Predicted std is 0.0 for at least one sample.")
s_copy = np.copy(s)
s[s_copy == 0.0] = 1.0
f = calculate_f()
f[s_copy == 0.0] = 0.0
else:
f = calculate_f()
if (f < 0).any():
raise ValueError("Expected Improvement is smaller than 0 for at least one " "sample.")
return f
else:
if len(X.shape) == 1:
X = X[:, np.newaxis]
m, var_ = self._model.predict_marginalized(X)
std = np.sqrt(var_)
def calculate_log_ei() -> np.ndarray:
# we expect that f_min is in log-space
assert self._eta is not None
assert self._xi is not None
f_min = self._eta - self._xi
v = (f_min - m) / std
return (np.exp(f_min) * norm.cdf(v)) - (np.exp(0.5 * var_ + m) * norm.cdf(v - std))
if np.any(std == 0.0):
# if std is zero, we have observed x on all instances
# using a RF, std should be never exactly 0.0
# Avoid zero division by setting all zeros in s to one.
# Consider the corresponding results in f to be zero.
logger.warning("Predicted std is 0.0 for at least one sample.")
std_copy = np.copy(std)
std[std_copy == 0.0] = 1.0
log_ei = calculate_log_ei()
log_ei[std_copy == 0.0] = 0.0
else:
log_ei = calculate_log_ei()
if (log_ei < 0).any():
raise ValueError("Expected Improvement is smaller than 0 for at least one sample.")
return log_ei.reshape((-1, 1))
[docs]class EIPS(EI):
r"""Expected Improvement per Second acquisition function
:math:`EI(X) := \frac{\mathbb{E}\left[\max\{0,f(\mathbf{X^+})-f_{t+1}(\mathbf{X})-\xi\right]\}]}{np.log(r(x))}`,
with :math:`f(X^+)` as the best location and :math:`r(x)` as runtime.
Parameters
----------
xi : float, defaults to 0.0
Controls the balance between exploration and exploitation of the acquisition function.
"""
def __init__(self, xi: float = 0.0) -> None:
super(EIPS, self).__init__(xi=xi)
@property
def name(self) -> str: # noqa: D102
return "Expected Improvement per Second"
def _compute(self, X: np.ndarray) -> np.ndarray:
"""Compute EI per second acquisition value
Parameters
----------
X : np.ndarray [N, D]
The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D),
with N as the number of points to evaluate at and D is the number of dimensions of one X.
Returns
-------
np.ndarray [N,1]
Acquisition function values wrt X.
Raises
------
ValueError
If the mean has the wrong shape, should have shape (-1, 2).
ValueError
If the variance has the wrong shape, should have shape (-1, 2).
ValueError
If `update` has not been called before (current incumbent value `eta` unspecified).
ValueError
If EIPS is < 0 for at least one sample.
"""
assert self._model is not None
if len(X.shape) == 1:
X = X[:, np.newaxis]
m, v = self._model.predict_marginalized(X)
if m.shape[1] != 2:
raise ValueError(f"m has wrong shape: {m.shape} != (-1, 2)")
if v.shape[1] != 2:
raise ValueError(f"v has wrong shape: {v.shape} != (-1, 2)")
m_cost = m[:, 0]
v_cost = v[:, 0]
# The model already predicts log(runtime)
m_runtime = m[:, 1]
s = np.sqrt(v_cost)
if self._eta is None:
raise ValueError(
"No current best specified. Call update("
"eta=<int>) to inform the acquisition function "
"about the current best value."
)
def calculate_f() -> np.ndarray:
z = (self._eta - m_cost - self._xi) / s
f = (self._eta - m_cost - self._xi) * norm.cdf(z) + s * norm.pdf(z)
f = f / m_runtime
return f
if np.any(s == 0.0):
# if std is zero, we have observed x on all instances
# using a RF, std should be never exactly 0.0
# Avoid zero division by setting all zeros in s to one.
# Consider the corresponding results in f to be zero.
logger.warning("Predicted std is 0.0 for at least one sample.")
s_copy = np.copy(s)
s[s_copy == 0.0] = 1.0
f = calculate_f()
f[s_copy == 0.0] = 0.0
else:
f = calculate_f()
if (f < 0).any():
raise ValueError("Expected Improvement per Second is smaller than 0 " "for at least one sample.")
return f.reshape((-1, 1))