# Copyright 2021-2024 The DeepCAVE Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# noqa: D400
"""
# RandomForest
This module can be used for training and using a Random Forest Regression model.
A pyrfr wrapper is used for simplification.
## Classes
- RandomForest: A random forest wrapper for pyrfr.
## Constants
VERY_SMALL_NUMBER : float
PYRFR_MAPPING : Dict[str, str]
"""
from typing import Any, Dict, List, Optional, Tuple, Union
import functools
import warnings
import numpy as np
import pyrfr.regression as regression
from ConfigSpace import ConfigurationSpace
from ConfigSpace.hyperparameters import (
CategoricalHyperparameter,
Constant,
UniformFloatHyperparameter,
UniformIntegerHyperparameter,
)
from sklearn.decomposition import PCA
from sklearn.exceptions import NotFittedError
from sklearn.preprocessing import MinMaxScaler
from deepcave.evaluators.epm.utils import get_types
VERY_SMALL_NUMBER = 1e-10
PYRFR_MAPPING = {
"n_trees": "num_trees",
"bootstrapping": "do_bootstrapping",
"max_features": "tree_opts.max_features",
"min_samples_split": "tree_opts.min_samples_to_split",
"min_samples_leaf": "tree_opts.min_samples_in_leaf",
"max_depth": "tree_opts.max_depth",
"eps_purity": "tree_opts.epsilon_purity",
"max_nodes": "tree_opts.max_num_nodes",
}
[docs]
class RandomForest:
"""
A random forest wrapper for pyrfr.
This is handy because only the configuration space needs to be passed.
and have a working version without specifying e.g. types and bounds.
Note
----
This wrapper also supports instances.
Properties
----------
cs : ConfigurationSpace
The configuration space.
log_y : bool
Whether y should be treated as a logarithmic transformation.
seed : int
The seed. If not provided, it is random.
types : List[int]
The types of the Hyperparameters.
bounds : List[Tuple[float, float]]
The bounds of the Hyperparameters.
n_params : int
The number of Hyperparameters in the configuration space.
n_features : int
The number of features.
pca_components : int
The number of components to keep for the principal component analysis (PCA).
pca : PCA
The principal component analysis (PCA) object.
scaler : MinMaxScaler
A MinMaxScaler to scale the features.
instance_features : ndarray
The instance features.
"""
def __init__(
self,
configspace: ConfigurationSpace,
n_trees: int = 16,
ratio_features: float = 5.0 / 6.0,
min_samples_split: int = 3,
min_samples_leaf: int = 3,
max_depth: int = 2**20,
max_nodes: int = 2**20,
eps_purity: float = 1e-8,
bootstrapping: bool = True,
instance_features: Optional[np.ndarray] = None,
pca_components: Optional[int] = 2,
log_y: bool = False,
seed: Optional[int] = 0,
):
self.cs = configspace
self.log_y = log_y
self.seed = seed
# Set types and bounds automatically
types, self.bounds = get_types(configspace, instance_features)
self.types = np.array(types)
# Prepare everything for PCA
self.n_params = len(list(configspace.values()))
self.n_features = 0
if instance_features is not None:
self.n_features = instance_features.shape[1]
self._pca_applied = False
self.pca_components = pca_components
self.pca = PCA(n_components=self.pca_components)
self.scaler = MinMaxScaler()
self.instance_features = instance_features
# Calculate max number of features
max_features = max(1, int(len(self.types) * ratio_features)) if ratio_features <= 1.0 else 0
# Prepare the model
self._model = self._get_model()
self._model.options = self._get_model_options(
n_trees=n_trees,
max_features=max_features,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
max_depth=max_depth,
max_nodes=max_nodes,
eps_purity=eps_purity,
bootstrapping=bootstrapping,
)
def _get_model(self) -> regression.base_tree:
"""
Return the internal model.
Returns
-------
model : regression.base_tree
Model which is used internally.
"""
return regression.binary_rss_forest()
def _get_model_options(self, **kwargs: Union[int, float, bool]) -> regression.forest_opts:
"""
Get model options from kwargs.
The mapping `PYRFR_MAPPING` is used in combination with
a recursive attribute setter to set the options for the pyrfr model.
Parameters
----------
**kwargs : Dict[str, Any]
The key word arguments for the model options.
Returns
-------
options : regression.forest_opts
Random forest options.
"""
# Now the options are set
options = regression.forest_opts()
def rgetattr(obj: object, attr: str, *args: Any) -> Any:
def _getattr(obj: object, attr: object) -> Any:
attr = str(attr)
return getattr(obj, attr, *args)
return functools.reduce(_getattr, [obj] + attr.split("."))
def rsetattr(obj: object, attr: str, val: Any) -> None:
pre, _, post = attr.rpartition(".")
return setattr(rgetattr(obj, pre) if pre else obj, post, val)
for k, v in kwargs.items():
new_k = PYRFR_MAPPING[k]
rsetattr(options, new_k, v)
return options
def _impute_inactive(self, X: np.ndarray) -> np.ndarray:
"""
Impute inactive values in X.
Parameters
----------
X : np.ndarray
Data points.
Returns
-------
np.ndarray
Imputed data points.
Raises
------
ValueError
If Hyperparameter is not supported.
"""
conditional: Dict[int, bool] = {}
impute_values: Dict[int, float] = {}
X = X.copy()
for idx, hp in enumerate(list(self.cs.values())):
if idx not in conditional:
parents = self.cs.parents_of[hp.name]
if len(parents) == 0:
conditional[idx] = False
else:
conditional[idx] = True
if isinstance(hp, CategoricalHyperparameter):
impute_values[idx] = len(hp.choices)
elif isinstance(hp, (UniformFloatHyperparameter, UniformIntegerHyperparameter)):
impute_values[idx] = -1
elif isinstance(hp, Constant):
impute_values[idx] = 1
else:
raise ValueError
if conditional[idx] is True:
nonfinite_mask = ~np.isfinite(X[:, idx])
X[nonfinite_mask, idx] = impute_values[idx]
return X
def _check_dimensions(self, X: np.ndarray, Y: Optional[np.ndarray] = None) -> None:
"""
Check if the dimensions of X and Y are correct with respect to features.
Parameters
----------
X : np.ndarray
Input data points.
Y : Optional[np.ndarray], optional
Target values. By default None.
Raises
------
ValueError
If any dimension of X or Y is incorrect or unsuitable.
"""
if len(X.shape) != 2:
raise ValueError(f"Expected 2d array, got {len(X.shape)}d array.")
if X.shape[1] != self.n_params + self.n_features:
raise ValueError(
f"Feature mismatch: X should have {self.n_params} features, but has {X.shape[1]}"
)
if Y is not None:
if X.shape[0] != Y.shape[0]:
raise ValueError(f"X.shape[0] ({X.shape[0]}) != y.shape[0] ({Y.shape[0]})")
def _get_data_container(
self, X: np.ndarray, y: np.ndarray
) -> regression.default_data_container:
"""
Fill a pyrfr default data container.
The goal here is, that the forest knows categoricals and bounds for continuous data.
Parameters
----------
X : np.ndarray [n_samples, n_features]
Input data points.
y : np.ndarray [n_samples, ]
Target values.
Returns
-------
data : regression.default_data_container
The filled data container that pyrfr can interpret.
"""
# retrieve the types and the bounds from the ConfigSpace
data = regression.default_data_container(X.shape[1])
for i, (mn, mx) in enumerate(self.bounds):
if np.isnan(mx):
data.set_type_of_feature(i, mn)
else:
data.set_bounds_of_feature(i, mn, mx)
for row_X, row_y in zip(X, y):
data.add_data_point(row_X, row_y)
return data
[docs]
def train(self, X: np.ndarray, Y: np.ndarray) -> None:
"""
Train the random forest on X and Y.
Transform X if principal component analysis (PCA) is applied.
Afterwards, `_train` is called.
Parameters
----------
X : np.ndarray [n_samples, n_features (config + instance features)]
Input data points.
Y : np.ndarray [n_samples, n_objectives]
Target values. `n_objectives` must match the number of target names specified in
the constructor.
"""
self._check_dimensions(X, Y)
# Reduce dimensionality of features of larger than PCA_DIM
self._pca_applied = False
if (
self.pca_components
and X.shape[0] > self.pca.n_components
and self.n_features >= self.pca_components
):
X_features = X[:, -self.n_features :]
# Scale features
X_features = self.scaler.fit_transform(X_features)
X_features = np.nan_to_num(X_features) # if features with max == min
# PCA
X_features = self.pca.fit_transform(X_features)
X = np.hstack((X[:, : self.n_params], X_features))
# Adopt types
self.types = np.array(
np.hstack((self.types[: self.n_params], np.zeros((X_features.shape[1])))),
dtype=np.uint,
)
self._pca_applied = True
self._train(X, Y)
def _train(self, X: np.ndarray, Y: np.ndarray) -> None:
"""
Train the random forest on X and Y.
Parameters
----------
X : np.ndarray
Input data points.
Y : np.ndarray
Target values.
"""
# Now we can start to prepare the data for the pyrfr
data = self._get_data_container(X, Y.flatten())
seed = self.seed
rng = regression.default_random_engine(seed)
# Set more specific model options and finally fit it
self._model.options.num_data_points_per_tree = X.shape[0]
self._model.fit(data, rng=rng)
[docs]
def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Predict means and variances for a given X.
Parameters
----------
X : np.ndarray [n_samples, n_features (config + instance features)]
Training samples.
Returns
-------
means : np.ndarray [n_samples, n_objectives]
Predictive mean.
vars : np.ndarray [n_samples, n_objectives] or [n_samples, n_samples]
Predictive variance or standard deviation.
"""
self._check_dimensions(X)
if self._pca_applied:
try:
X_features = X[:, -self.n_features :]
X_features = self.scaler.transform(X_features)
X_features = self.pca.transform(X_features)
X = np.hstack((X[:, : self.n_params], X_features))
except NotFittedError:
pass # PCA not fitted if only one training sample
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", "Predicted variances are smaller than 0. Setting those variances to 0."
)
mean, var = self._predict(X)
if len(mean.shape) == 1:
mean = mean.reshape((-1, 1))
if var is not None and len(var.shape) == 1:
var = var.reshape((-1, 1))
return mean, var
def _predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Predict means and variances for a given X.
Parameters
----------
X : np.ndarray
[n_samples, n_features (config + instance features)]
Returns
-------
means : np.ndarray [n_samples, 1]
Predictive mean.
vars : np.ndarray [n_samples, 1]
Predictive variance.
"""
self._check_dimensions(X)
X = self._impute_inactive(X)
if self.log_y:
all_preds = []
third_dimension = 0
# Gather data in a list of 2d arrays and get statistics about the required size of the
# 3d array
for row_X in X:
preds_per_tree = self._model.all_leaf_values(row_X)
all_preds.append(preds_per_tree)
max_num_leaf_data = max(map(len, preds_per_tree))
third_dimension = max(max_num_leaf_data, third_dimension)
# Transform list of 2d arrays into a 3d array
num_trees = self._model.options.num_trees
shape = (X.shape[0], num_trees, third_dimension)
preds_as_array = np.zeros(shape) * np.nan
for i, preds_per_tree in enumerate(all_preds):
for j, pred in enumerate(preds_per_tree):
preds_as_array[i, j, : len(pred)] = pred
# Do all necessary computation with vectorized functions
preds_as_array = np.log(np.nanmean(np.exp(preds_as_array), axis=2) + VERY_SMALL_NUMBER)
# Compute the mean and the variance across the different trees
means = preds_as_array.mean(axis=1)
vars_ = preds_as_array.var(axis=1)
else:
means, vars_ = [], []
for row_X in X:
mean_, var = self._model.predict_mean_var(row_X)
means.append(mean_)
vars_.append(var)
means = np.array(means)
vars_ = np.array(vars_)
return means.reshape((-1, 1)), vars_.reshape((-1, 1))
[docs]
def predict_marginalized(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Predict mean and variance marginalized over all instances.
Return the predictive mean and variance marginalized over all
instances for a set of configurations.
Parameters
----------
X : np.ndarray
[n_samples, n_features (config)]
Returns
-------
means : np.ndarray of shape = [n_samples, 1]
Predictive mean
vars : np.ndarray of shape = [n_samples, 1]
Predictive variance
"""
self._check_dimensions(X)
if self.instance_features is None or len(self.instance_features) == 0:
mean_, var = self.predict(X)
assert var is not None # please mypy
var[var < VERY_SMALL_NUMBER] = VERY_SMALL_NUMBER
var[np.isnan(var)] = VERY_SMALL_NUMBER
return mean_, var
X = self._impute_inactive(X)
# marginalized predictions for each tree
dat_ = np.zeros((X.shape[0], self._model.options.num_trees))
for i, x in enumerate(X):
# marginalize over instances
# 1. get all leaf values for each tree
preds_trees: List[List[float]] = [[] for i in range(self._model.options.num_trees)]
for feat in self.instance_features:
x_ = np.concatenate([x, feat])
preds_per_tree = self._model.all_leaf_values(x_)
for tree_id, preds in enumerate(preds_per_tree):
preds_trees[tree_id] += preds
# 2. average in each tree
if self.log_y:
for tree_id in range(self._model.options.num_trees):
dat_[i, tree_id] = np.log(np.exp(np.array(preds_trees[tree_id])).mean())
else:
for tree_id in range(self._model.options.num_trees):
dat_[i, tree_id] = np.array(preds_trees[tree_id]).mean()
# 3. compute statistics across trees
mean_ = dat_.mean(axis=1)
var = dat_.var(axis=1)
var[var < VERY_SMALL_NUMBER] = VERY_SMALL_NUMBER
if len(mean_.shape) == 1:
mean_ = mean_.reshape((-1, 1))
if len(var.shape) == 1:
var = var.reshape((-1, 1))
return mean_, var
[docs]
def get_leaf_values(self, x: np.ndarray) -> regression.binary_rss_forest:
"""
Get the leaf values of the model.
Parameters
----------
x : np.ndarray
Input data array.
Returns
-------
regression.binary_rss_forest
The leaf values of the model.
"""
return self._model.all_leaf_values(x)