Source code for smac.epm.random_forest.rfr_imputator

from typing import Optional

import logging
import warnings

import numpy as np
from scipy.stats import truncnorm

import smac.epm.base_imputor
from smac.epm.base_epm import BaseEPM

__author__ = "Katharina Eggensperger"
__copyright__ = "Copyright 2015, ML4AAD"
__license__ = "3-clause BSD"
__maintainer__ = "Katharina Eggensperger"
__email__ = "eggenspk@cs.uni-freiburg.de"
__version__ = "0.0.1"


[docs]class RFRImputator(smac.epm.base_imputor.BaseImputor): """Imputor using pyrfr's Random Forest regressor. **Note:** Sets var_threshold as the lower bound on the variance for the predictions of the random forest Parameters ---------- rng : np.random.RandomState Will be used to draw a seed (currently not used) cutoff : float Cutoff value for this scenario (upper runnning time limit) threshold : float Highest possible values (e.g. cutoff * parX). model : BaseEPM Predictive model (i.e. RandomForestWithInstances) change_threshold : float Stop imputation if change is less than this. max_iter : int Maximum number of imputation iterations. Attributes ---------- logger : logging.Logger max_iter : int change_threshold : float cutoff : float threshold : float seed : int Created by drawing random int from rng model : BaseEPM Predictive model (i.e. RandomForestWithInstances) var_threshold: float """ def __init__( self, rng: np.random.RandomState, cutoff: float, threshold: float, model: BaseEPM, change_threshold: float = 0.01, max_iter: int = 2, ): super(RFRImputator, self).__init__() self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.max_iter = max_iter self.change_threshold = change_threshold self.cutoff = cutoff self.threshold = threshold self.seed = rng.randint(low=0, high=1000) self.model = model # Never use a lower variance than this self.var_threshold = 10**-2
[docs] def impute( self, censored_X: np.ndarray, censored_y: np.ndarray, uncensored_X: np.ndarray, uncensored_y: np.ndarray, ) -> Optional[np.ndarray]: """Imputes censored runs and returns new y values. Parameters ---------- censored_X : np.ndarray [N, M] Feature array of all censored runs. censored_y : np.ndarray [N, 1] Target values for all runs censored runs. uncensored_X : np.ndarray [N, M] Feature array of all non-censored runs. uncensored_y : np.ndarray [N, 1] Target values for all non-censored runs. Returns ------- imputed_y : np.ndarray Same shape as censored_y [N, 1] """ if censored_X.shape[0] == 0: self.logger.critical("Nothing to impute, return None") return None censored_y = censored_y.flatten() uncensored_y = uncensored_y.flatten() # first learn model without censored data self.model.train(uncensored_X, uncensored_y) self.logger.debug("Going to impute %d y-values with %s" % (censored_X.shape[0], str(self.model))) imputed_y = None # define this, if imputation fails # Define variables y = np.empty((0,)) # This only defines the type, the actual value will not be used later on. it = 1 change = 0 while True: self.logger.debug("Iteration %d of %d" % (it, self.max_iter)) # predict censored y values y_mean, y_var = self.model.predict(censored_X) assert y_var is not None # please mypy y_var[y_var < self.var_threshold] = self.var_threshold y_stdev = np.sqrt(y_var)[:, 0] y_mean = y_mean[:, 0] # ignore the warnings of truncnorm.stats # since we handle them appropriately with warnings.catch_warnings(): warnings.filterwarnings("ignore", r"invalid value encountered in (subtract|true_divide|power).*") warnings.filterwarnings("ignore", r"divide by zero encountered in (true_divide|log).*") imputed_y = truncnorm.stats( a=(censored_y - y_mean) / y_stdev, b=(self.threshold - y_mean) / y_stdev, loc=y_mean, scale=y_stdev, moments="m", ) imputed_y = np.array(imputed_y) nans = ~np.isfinite(imputed_y) n_nans = sum(nans) if n_nans > 0: # Replace all nans with maximum of predicted perf and censored value # this happens if the prediction is far smaller than the # censored data point self.logger.debug("Going to replace %d nan-value(s) with " "max(captime, predicted mean)" % n_nans) imputed_y[nans] = np.max([censored_y[nans], y_mean[nans]], axis=0) if it > 1: # Calc mean difference between imputed values this and last # iteration, assume imputed values are always concatenated # after uncensored values change = np.mean(np.abs(imputed_y - y[uncensored_y.shape[0] :]) / y[uncensored_y.shape[0] :]) # lower all values that are higher than threshold # should probably never happen imputed_y[imputed_y >= self.threshold] = self.threshold self.logger.debug("Change: %f" % change) X = np.concatenate((uncensored_X, censored_X)) y = np.concatenate((uncensored_y, imputed_y)) if change > self.change_threshold or it == 1: self.model.train(X, y) else: break it += 1 if it > self.max_iter: break self.logger.debug("Imputation used %d/%d iterations, last_change=%f" % (it - 1, self.max_iter, change)) # replace all y > cutoff with PAR10 values (i.e., threshold) imputed_y = np.array(imputed_y, dtype=float) imputed_y[imputed_y >= self.cutoff] = self.threshold if not np.isfinite(imputed_y).all(): self.logger.critical("Imputed values are not finite, %s" % str(imputed_y)) return np.reshape(imputed_y, [imputed_y.shape[0], 1])