from __future__ import annotations
from abc import abstractmethod
from typing import Any, TypeVar
import copy
import warnings
import numpy as np
from ConfigSpace import ConfigurationSpace
from sklearn.decomposition import PCA
from sklearn.exceptions import NotFittedError
from sklearn.preprocessing import MinMaxScaler
from smac.constants import VERY_SMALL_NUMBER
from smac.utils.configspace import get_types
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
Self = TypeVar("Self", bound="AbstractModel")
[docs]
class AbstractModel:
"""Abstract implementation of the surrogate model.
Note
----
The input dimensionality of Y for training and the output dimensions of all predictions depend on the concrete
implementation of this abstract class.
Parameters
----------
configspace : ConfigurationSpace
instance_features : dict[str, list[int | float]] | None, defaults to None
Features (list of int or floats) of the instances (str). The features are incorporated into the X data,
on which the model is trained on.
pca_components : float, defaults to 7
Number of components to keep when using PCA to reduce dimensionality of instance features.
seed : int
"""
def __init__(
self,
configspace: ConfigurationSpace,
instance_features: dict[str, list[int | float]] | None = None,
pca_components: int | None = 7,
seed: int = 0,
) -> None:
self._configspace = configspace
self._seed = seed
self._rng = np.random.RandomState(self._seed)
self._instance_features = instance_features
self._pca_components = pca_components
n_features = 0
if self._instance_features is not None:
for v in self._instance_features.values():
if n_features == 0:
n_features = len(v)
else:
if len(v) != n_features:
raise RuntimeError("Instances must have the same number of features.")
self._n_features = n_features
self._n_hps = len(self._configspace.get_hyperparameters())
self._pca = PCA(n_components=self._pca_components)
self._scaler = MinMaxScaler()
self._apply_pca = False
# Never use a lower variance than this.
# If estimated variance < var_threshold, set to var_threshold
self._var_threshold = VERY_SMALL_NUMBER
self._types, self._bounds = get_types(configspace, instance_features)
# Initial types array which is used to reset the type array at every call to `self.train()`
self._initial_types = copy.deepcopy(self._types)
@property
def meta(self) -> dict[str, Any]:
"""Returns the meta data of the created object."""
return {
"name": self.__class__.__name__,
"types": self._types,
"bounds": self._bounds,
"pca_components": self._pca_components,
}
[docs]
def train(self: Self, X: np.ndarray, Y: np.ndarray) -> Self:
"""Trains the random forest on X and Y. Internally, calls the method `_train`.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters + #features]
Input data points.
Y : np.ndarray [#samples, #objectives]
The corresponding target values.
Returns
-------
self : AbstractModel
"""
if len(X.shape) != 2:
raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))
if X.shape[1] != self._n_hps + self._n_features:
raise ValueError(
f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
f"but has {X.shape[1]} in total."
)
if X.shape[0] != Y.shape[0]:
raise ValueError("X.shape[0] ({}) != y.shape[0] ({})".format(X.shape[0], Y.shape[0]))
# Reduce dimensionality of features if larger than PCA_DIM
if (
self._pca_components is not None
and X.shape[0] > self._pca.n_components
and self._n_features >= self._pca_components
):
X_feats = X[:, -self._n_features :]
# Scale features
X_feats = self._scaler.fit_transform(X_feats)
X_feats = np.nan_to_num(X_feats) # if features with max == min
# PCA
X_feats = self._pca.fit_transform(X_feats)
X = np.hstack((X[:, : self._n_hps], X_feats))
if hasattr(self, "_types"):
# For RF, adapt types list
# if X_feats.shape[0] < self._pca, X_feats.shape[1] == X_feats.shape[0]
self._types = np.array(
np.hstack((self._types[: self._n_hps], np.zeros(X_feats.shape[1]))),
dtype=np.uint,
) # type: ignore
self._apply_pca = True
else:
self._apply_pca = False
if hasattr(self, "_types"):
self._types = copy.deepcopy(self._initial_types)
return self._train(X, Y)
@abstractmethod
def _train(self: Self, X: np.ndarray, Y: np.ndarray) -> Self:
"""Trains the random forest on X and Y.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters + #features]
Input data points.
Y : np.ndarray [#samples, #objectives]
The corresponding target values.
Returns
-------
self : AbstractModel
"""
raise NotImplementedError()
[docs]
def predict(
self,
X: np.ndarray,
covariance_type: str | None = "diagonal",
) -> tuple[np.ndarray, np.ndarray | None]:
"""Predicts mean and variance for a given X. Internally, calls the method `_predict`.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters + #features]
Input data points.
covariance_type: str | None, defaults to "diagonal"
Specifies what to return along with the mean. Applied only to Gaussian Processes.
Takes four valid inputs:
* None: Only the mean is returned.
* "std": Standard deviation at test points is returned.
* "diagonal": Diagonal of the covariance matrix is returned.
* "full": Whole covariance matrix between the test points is returned.
Returns
-------
means : np.ndarray [#samples, #objectives]
The predictive mean.
vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None
Predictive variance or standard deviation.
"""
if len(X.shape) != 2:
raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))
if X.shape[1] != self._n_hps + self._n_features:
raise ValueError(
f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
f"but has {X.shape[1]} in total."
)
if self._apply_pca:
try:
X_feats = X[:, -self._n_features :]
X_feats = self._scaler.transform(X_feats)
X_feats = self._pca.transform(X_feats)
X = np.hstack((X[:, : self._n_hps], X_feats))
except NotFittedError:
# PCA not fitted if only one training sample
pass
if X.shape[1] != len(self._types):
raise ValueError("Rows in X should have %d entries but have %d!" % (len(self._types), X.shape[1]))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "Predicted variances smaller than 0. Setting those variances to 0.")
mean, var = self._predict(X, covariance_type)
if len(mean.shape) == 1:
mean = mean.reshape((-1, 1))
if var is not None and len(var.shape) == 1:
var = var.reshape((-1, 1))
return mean, var
def _predict(
self,
X: np.ndarray,
covariance_type: str | None = "diagonal",
) -> tuple[np.ndarray, np.ndarray | None]:
"""Predicts mean and variance for a given X.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters + #features]
Input data points.
covariance_type : str | None, defaults to "diagonal"
Specifies what to return along with the mean. Applied only to Gaussian Processes.
Takes four valid inputs:
* None: Only the mean is returned.
* "std": Standard deviation at test points is returned.
* "diagonal": Diagonal of the covariance matrix is returned.
* "full": Whole covariance matrix between the test points is returned.
Returns
-------
means : np.ndarray [#samples, #objectives]
The predictive mean.
vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None
Predictive variance or standard deviation.
"""
raise NotImplementedError()
[docs]
def predict_marginalized(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Predicts mean and variance marginalized over all instances.
Warning
-------
The input data must not include any features.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters]
Input data points.
Returns
-------
means : np.ndarray [#samples, 1]
The predictive mean.
vars : np.ndarray [#samples, 1]
The predictive variance.
"""
if len(X.shape) != 2:
raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))
if X.shape[1] != self._n_hps:
raise ValueError(
f"Feature mismatch: X should have {self._n_hps} hyperparameters (and no features) for this method, "
f"but has {X.shape[1]} in total."
)
if self._instance_features is None:
mean, var = self.predict(X)
assert var is not None
var[var < self._var_threshold] = self._var_threshold
var[np.isnan(var)] = self._var_threshold
return mean, var
else:
n_instances = len(self._instance_features)
mean = np.zeros(X.shape[0])
var = np.zeros(X.shape[0])
for i, x in enumerate(X):
features = np.array(list(self._instance_features.values()))
x_tiled = np.tile(x, (n_instances, 1))
X_ = np.hstack((x_tiled, features))
means, vars = self.predict(X_)
assert vars is not None
# VAR[1/n (X_1 + ... + X_n)] =
# 1/n^2 * ( VAR(X_1) + ... + VAR(X_n))
# for independent X_1 ... X_n
var_x = np.sum(vars) / (len(vars) ** 2)
if var_x < self._var_threshold:
var_x = self._var_threshold
var[i] = var_x
mean[i] = np.mean(means)
if len(mean.shape) == 1:
mean = mean.reshape((-1, 1))
if len(var.shape) == 1:
var = var.reshape((-1, 1))
return mean, var