from __future__ import annotations
from typing import Any
import numpy as np
from scipy.stats import norm
from smac.acquisition.function.abstract_acquisition_function import (
    AbstractAcquisitionFunction,
)
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]
class EI(AbstractAcquisitionFunction):
    r"""Expected Improvement (with or without function values in log space) acquisition function
    :math:`EI(X) := \mathbb{E}\left[ \max\{0, f(\mathbf{X^+}) - f_{t+1}(\mathbf{X}) - \xi \} \right]`,
    with :math:`f(X^+)` as the best location.
    Parameters
    ----------
    xi : float, defaults to 0.0
        Controls the balance between exploration and exploitation of the
        acquisition function.
    log : bool, defaults to False
        Whether the function values are in log-space.
    Attributes
    ----------
    _xi : float
        Exploration-exloitation trade-off parameter.
    _log: bool
        Function values in log-space or not.
    _eta : float
        Current incumbent function value (best value observed so far).
    """
    def __init__(
        self,
        xi: float = 0.0,
        log: bool = False,
    ) -> None:
        super(EI, self).__init__()
        self._xi: float = xi
        self._log: bool = log
        self._eta: float | None = None
    @property
    def name(self) -> str:  # noqa: D102
        return "Expected Improvement"
    @property
    def meta(self) -> dict[str, Any]:  # noqa: D102
        meta = super().meta
        meta.update(
            {
                "xi": self._xi,
                "log": self._log,
            }
        )
        return meta
    def _update(self, **kwargs: Any) -> None:
        """Update acsquisition function attributes
        Parameters
        ----------
        eta : float
            Function value of current incumbent.
        xi : float, optional
            Exploration-exploitation trade-off parameter
        """
        assert "eta" in kwargs
        self._eta = kwargs["eta"]
        if "xi" in kwargs and kwargs["xi"] is not None:
            self._xi = kwargs["xi"]
    def _compute(self, X: np.ndarray) -> np.ndarray:
        """Compute EI acquisition value
        Parameters
        ----------
        X : np.ndarray [N, D]
            The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D),
            with N as the number of points to evaluate at and D is the number of dimensions of one X.
        Returns
        -------
        np.ndarray [N,1]
            Acquisition function values wrt X.
        Raises
        ------
        ValueError
            If `update` has not been called before (current incumbent value `eta` unspecified).
        ValueError
            If EI is < 0 for at least one sample (normal function value space).
        ValueError
            If EI is < 0 for at least one sample (log function value space).
        """
        assert self._model is not None
        assert self._xi is not None
        if self._eta is None:
            raise ValueError(
                "No current best specified. Call update("
                "eta=<int>) to inform the acquisition function "
                "about the current best value."
            )
        if not self._log:
            if len(X.shape) == 1:
                X = X[:, np.newaxis]
            m, v = self._model.predict_marginalized(X)
            s = np.sqrt(v)
            def calculate_f() -> np.ndarray:
                z = (self._eta - m - self._xi) / s
                return (self._eta - m - self._xi) * norm.cdf(z) + s * norm.pdf(z)
            if np.any(s == 0.0):
                # if std is zero, we have observed x on all instances
                # using a RF, std should be never exactly 0.0
                # Avoid zero division by setting all zeros in s to one.
                # Consider the corresponding results in f to be zero.
                logger.warning("Predicted std is 0.0 for at least one sample.")
                s_copy = np.copy(s)
                s[s_copy == 0.0] = 1.0
                f = calculate_f()
                f[s_copy == 0.0] = 0.0
            else:
                f = calculate_f()
            if (f < 0).any():
                raise ValueError("Expected Improvement is smaller than 0 for at least one " "sample.")
            return f
        else:
            if len(X.shape) == 1:
                X = X[:, np.newaxis]
            m, var_ = self._model.predict_marginalized(X)
            std = np.sqrt(var_)
            def calculate_log_ei() -> np.ndarray:
                # we expect that f_min is in log-space
                assert self._eta is not None
                assert self._xi is not None
                f_min = self._eta - self._xi
                v = (f_min - m) / std
                return (np.exp(f_min) * norm.cdf(v)) - (np.exp(0.5 * var_ + m) * norm.cdf(v - std))
            if np.any(std == 0.0):
                # if std is zero, we have observed x on all instances
                # using a RF, std should be never exactly 0.0
                # Avoid zero division by setting all zeros in s to one.
                # Consider the corresponding results in f to be zero.
                logger.warning("Predicted std is 0.0 for at least one sample.")
                std_copy = np.copy(std)
                std[std_copy == 0.0] = 1.0
                log_ei = calculate_log_ei()
                log_ei[std_copy == 0.0] = 0.0
            else:
                log_ei = calculate_log_ei()
            if (log_ei < 0).any():
                raise ValueError("Expected Improvement is smaller than 0 for at least one sample.")
            return log_ei.reshape((-1, 1)) 
[docs]
class EIPS(EI):
    r"""Expected Improvement per Second acquisition function
    :math:`EI(X) := \frac{\mathbb{E}\left[\max\{0,f(\mathbf{X^+})-f_{t+1}(\mathbf{X})-\xi\right]\}]}{np.log(r(x))}`,
    with :math:`f(X^+)` as the best location and :math:`r(x)` as runtime.
    Parameters
    ----------
    xi : float, defaults to 0.0
        Controls the balance between exploration and exploitation of the acquisition function.
    """
    def __init__(self, xi: float = 0.0) -> None:
        super(EIPS, self).__init__(xi=xi)
    @property
    def name(self) -> str:  # noqa: D102
        return "Expected Improvement per Second"
    def _compute(self, X: np.ndarray) -> np.ndarray:
        """Compute EI per second acquisition value
        Parameters
        ----------
        X : np.ndarray [N, D]
            The input points where the acquisition function should be evaluated. The dimensionality of X is (N, D),
            with N as the number of points to evaluate at and D is the number of dimensions of one X.
        Returns
        -------
        np.ndarray [N,1]
            Acquisition function values wrt X.
        Raises
        ------
        ValueError
            If the mean has the wrong shape, should have shape (-1, 2).
        ValueError
            If the variance has the wrong shape, should have shape (-1, 2).
        ValueError
            If `update` has not been called before (current incumbent value `eta` unspecified).
        ValueError
            If EIPS is < 0 for at least one sample.
        """
        assert self._model is not None
        if len(X.shape) == 1:
            X = X[:, np.newaxis]
        m, v = self._model.predict_marginalized(X)
        if m.shape[1] != 2:
            raise ValueError(f"m has wrong shape: {m.shape} != (-1, 2)")
        if v.shape[1] != 2:
            raise ValueError(f"v has wrong shape: {v.shape} != (-1, 2)")
        m_cost = m[:, 0]
        v_cost = v[:, 0]
        # The model already predicts log(runtime)
        m_runtime = m[:, 1]
        s = np.sqrt(v_cost)
        if self._eta is None:
            raise ValueError(
                "No current best specified. Call update("
                "eta=<int>) to inform the acquisition function "
                "about the current best value."
            )
        def calculate_f() -> np.ndarray:
            z = (self._eta - m_cost - self._xi) / s
            f = (self._eta - m_cost - self._xi) * norm.cdf(z) + s * norm.pdf(z)
            f = f / m_runtime
            return f
        if np.any(s == 0.0):
            # if std is zero, we have observed x on all instances
            # using a RF, std should be never exactly 0.0
            # Avoid zero division by setting all zeros in s to one.
            # Consider the corresponding results in f to be zero.
            logger.warning("Predicted std is 0.0 for at least one sample.")
            s_copy = np.copy(s)
            s[s_copy == 0.0] = 1.0
            f = calculate_f()
            f[s_copy == 0.0] = 0.0
        else:
            f = calculate_f()
        if (f < 0).any():
            raise ValueError("Expected Improvement per Second is smaller than 0 " "for at least one sample.")
        return f.reshape((-1, 1))