Source code for smac.epm.random_forest.rf_with_instances

from typing import List, Optional, Tuple

import numpy as np
from pyrfr import regression

from smac.configspace import ConfigurationSpace
from smac.epm.random_forest import BaseModel
from smac.utils.constants import N_TREES, VERY_SMALL_NUMBER

__author__ = "Aaron Klein"
__copyright__ = "Copyright 2015, ML4AAD"
__license__ = "3-clause BSD"
__maintainer__ = "Aaron Klein"
__email__ = "kleinaa@cs.uni-freiburg.de"
__version__ = "0.0.1"


[docs]class RandomForestWithInstances(BaseModel): """Random forest that takes instance features into account. Parameters ---------- types : List[int] Specifies the number of categorical values of an input dimension where the i-th entry corresponds to the i-th input dimension. Let's say we have 2 dimension where the first dimension consists of 3 different categorical choices and the second dimension is continuous than we have to pass [3, 0]. Note that we count starting from 0. bounds : List[Tuple[float, float]] bounds of input dimensions: (lower, uppper) for continuous dims; (n_cat, np.nan) for categorical dims seed : int The seed that is passed to the random_forest_run library. log_y: bool y values (passed to this RF) are expected to be log(y) transformed; this will be considered during predicting num_trees : int The number of trees in the random forest. do_bootstrapping : bool Turns on / off bootstrapping in the random forest. n_points_per_tree : int Number of points per tree. If <= 0 X.shape[0] will be used in _train(X, y) instead ratio_features : float The ratio of features that are considered for splitting. min_samples_split : int The minimum number of data points to perform a split. min_samples_leaf : int The minimum number of data points in a leaf. max_depth : int The maximum depth of a single tree. eps_purity : float The minimum difference between two target values to be considered different max_num_nodes : int The maxmimum total number of nodes in a tree instance_features : np.ndarray (I, K) Contains the K dimensional instance features of the I different instances pca_components : float Number of components to keep when using PCA to reduce dimensionality of instance features. Requires to set n_feats (> pca_dims). Attributes ---------- rf_opts : regression.rf_opts Random forest hyperparameter n_points_per_tree : int rf : regression.binary_rss_forest Only available after training hypers: list List of random forest hyperparameters unlog_y: bool seed : int types : np.ndarray bounds : list rng : np.random.RandomState logger : logging.logger """ def __init__( self, configspace: ConfigurationSpace, types: List[int], bounds: List[Tuple[float, float]], seed: int, log_y: bool = False, num_trees: int = N_TREES, do_bootstrapping: bool = True, n_points_per_tree: int = -1, ratio_features: float = 5.0 / 6.0, min_samples_split: int = 3, min_samples_leaf: int = 3, max_depth: int = 2**20, eps_purity: float = 1e-8, max_num_nodes: int = 2**20, instance_features: Optional[np.ndarray] = None, pca_components: Optional[int] = None, ) -> None: super().__init__( configspace=configspace, types=types, bounds=bounds, seed=seed, instance_features=instance_features, pca_components=pca_components, ) self.log_y = log_y self.rng = regression.default_random_engine(seed) self.rf_opts = regression.forest_opts() self.rf_opts.num_trees = num_trees self.rf_opts.do_bootstrapping = do_bootstrapping max_features = 0 if ratio_features > 1.0 else max(1, int(len(types) * ratio_features)) self.rf_opts.tree_opts.max_features = max_features self.rf_opts.tree_opts.min_samples_to_split = min_samples_split self.rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf self.rf_opts.tree_opts.max_depth = max_depth self.rf_opts.tree_opts.epsilon_purity = eps_purity self.rf_opts.tree_opts.max_num_nodes = max_num_nodes self.rf_opts.compute_law_of_total_variance = False self.n_points_per_tree = n_points_per_tree self.rf = None # type: regression.binary_rss_forest # This list well be read out by save_iteration() in the solver self.hypers = [ num_trees, max_num_nodes, do_bootstrapping, n_points_per_tree, ratio_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, self.seed, ] def _train(self, X: np.ndarray, y: np.ndarray) -> "RandomForestWithInstances": """Trains the random forest on X and y. Parameters ---------- X : np.ndarray [n_samples, n_features (config + instance features)] Input data points. y : np.ndarray [n_samples, ] The corresponding target values. Returns ------- self """ X = self._impute_inactive(X) self.X = X self.y = y.flatten() if self.n_points_per_tree <= 0: self.rf_opts.num_data_points_per_tree = self.X.shape[0] else: self.rf_opts.num_data_points_per_tree = self.n_points_per_tree self.rf = regression.binary_rss_forest() self.rf.options = self.rf_opts data = self._init_data_container(self.X, self.y) self.rf.fit(data, rng=self.rng) return self def _init_data_container(self, X: np.ndarray, y: np.ndarray) -> regression.default_data_container: """Fills a pyrfr default data container, s.t. the forest knows categoricals and bounds for continous data. Parameters ---------- X : np.ndarray [n_samples, n_features] Input data points y : np.ndarray [n_samples, ] Corresponding target values Returns ------- data : regression.default_data_container The filled data container that pyrfr can interpret """ # retrieve the types and the bounds from the ConfigSpace data = regression.default_data_container(X.shape[1]) for i, (mn, mx) in enumerate(self.bounds): if np.isnan(mx): data.set_type_of_feature(i, mn) else: data.set_bounds_of_feature(i, mn, mx) for row_X, row_y in zip(X, y): data.add_data_point(row_X, row_y) return data def _predict(self, X: np.ndarray, cov_return_type: Optional[str] = "diagonal_cov") -> Tuple[np.ndarray, np.ndarray]: """Predict means and variances for given X. Parameters ---------- X : np.ndarray of shape = [n_samples, n_features (config + instance features)] cov_return_type: Optional[str] Specifies what to return along with the mean. Refer ``predict()`` for more information. Returns ------- means : np.ndarray of shape = [n_samples, 1] Predictive mean vars : np.ndarray of shape = [n_samples, 1] Predictive variance """ if len(X.shape) != 2: raise ValueError("Expected 2d array, got %dd array!" % len(X.shape)) if X.shape[1] != len(self.types): raise ValueError("Rows in X should have %d entries but have %d!" % (len(self.types), X.shape[1])) if cov_return_type != "diagonal_cov": raise ValueError("'cov_return_type' can only take 'diagonal_cov' for this model") X = self._impute_inactive(X) if self.log_y: all_preds = [] third_dimension = 0 # Gather data in a list of 2d arrays and get statistics about the required size of the 3d array for row_X in X: preds_per_tree = self.rf.all_leaf_values(row_X) all_preds.append(preds_per_tree) max_num_leaf_data = max(map(len, preds_per_tree)) third_dimension = max(max_num_leaf_data, third_dimension) # Transform list of 2d arrays into a 3d array preds_as_array = np.zeros((X.shape[0], self.rf_opts.num_trees, third_dimension)) * np.NaN for i, preds_per_tree in enumerate(all_preds): for j, pred in enumerate(preds_per_tree): preds_as_array[i, j, : len(pred)] = pred # Do all necessary computation with vectorized functions preds_as_array = np.log(np.nanmean(np.exp(preds_as_array), axis=2) + VERY_SMALL_NUMBER) # Compute the mean and the variance across the different trees means = preds_as_array.mean(axis=1) vars_ = preds_as_array.var(axis=1) else: means, vars_ = [], [] for row_X in X: mean_, var = self.rf.predict_mean_var(row_X) means.append(mean_) vars_.append(var) means = np.array(means) vars_ = np.array(vars_) return means.reshape((-1, 1)), vars_.reshape((-1, 1))
[docs] def predict_marginalized_over_instances(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Predict mean and variance marginalized over all instances. Returns the predictive mean and variance marginalised over all instances for a set of configurations. Note ---- This method overwrites the same method of ~smac.epm.base_epm.AbstractEPM; the following method is random forest specific and follows the SMAC2 implementation; it requires no distribution assumption to marginalize the uncertainty estimates Parameters ---------- X : np.ndarray [n_samples, n_features (config)] Returns ------- means : np.ndarray of shape = [n_samples, 1] Predictive mean vars : np.ndarray of shape = [n_samples, 1] Predictive variance """ if self.instance_features is None or len(self.instance_features) == 0: mean_, var = self.predict(X) assert var is not None # please mypy var[var < self.var_threshold] = self.var_threshold var[np.isnan(var)] = self.var_threshold return mean_, var if len(X.shape) != 2: raise ValueError("Expected 2d array, got %dd array!" % len(X.shape)) if X.shape[1] != len(self.bounds): raise ValueError("Rows in X should have %d entries but have %d!" % (len(self.bounds), X.shape[1])) X = self._impute_inactive(X) dat_ = np.zeros((X.shape[0], self.rf_opts.num_trees)) # marginalized predictions for each tree for i, x in enumerate(X): # marginalize over instances # 1. get all leaf values for each tree preds_trees = [[] for i in range(self.rf_opts.num_trees)] # type: List[List[float]] for feat in self.instance_features: x_ = np.concatenate([x, feat]) preds_per_tree = self.rf.all_leaf_values(x_) for tree_id, preds in enumerate(preds_per_tree): preds_trees[tree_id] += preds # 2. average in each tree if self.log_y: for tree_id in range(self.rf_opts.num_trees): dat_[i, tree_id] = np.log(np.exp(np.array(preds_trees[tree_id])).mean()) else: for tree_id in range(self.rf_opts.num_trees): dat_[i, tree_id] = np.array(preds_trees[tree_id]).mean() # 3. compute statistics across trees mean_ = dat_.mean(axis=1) var = dat_.var(axis=1) if var is None: raise RuntimeError("The variance must not be none.") var[var < self.var_threshold] = self.var_threshold if len(mean_.shape) == 1: mean_ = mean_.reshape((-1, 1)) if len(var.shape) == 1: var = var.reshape((-1, 1)) return mean_, var