import abc
from typing import Dict, List, Mapping, Optional, Tuple
import logging
import numpy as np
from smac.configspace import convert_configurations_to_array
from smac.epm.base_imputor import BaseImputor
from smac.multi_objective.aggregation_strategy import AggregationStrategy
from smac.multi_objective.utils import normalize_costs
from smac.runhistory.runhistory import RunHistory, RunKey, RunValue
from smac.scenario.scenario import Scenario
from smac.tae import StatusType
from smac.utils import constants
__author__ = "Katharina Eggensperger"
__copyright__ = "Copyright 2015, ML4AAD"
__license__ = "3-clause BSD"
__maintainer__ = "Katharina Eggensperger"
__email__ = "eggenspk@cs.uni-freiburg.de"
__version__ = "0.0.1"
class AbstractRunHistory2EPM(object):
__metaclass__ = abc.ABCMeta
"""Abstract class for preprocessing data in order to train an EPM.
Parameters
----------
scenario: Scenario Object
Algorithm Configuration Scenario
num_params : int
number of parameters in config space
success_states: list, optional
List of states considered as successful (such as StatusType.SUCCESS).
If None, raise TypeError.
impute_censored_data: bool, optional
Should we impute data?
consider_for_higher_budgets_state: list, optional
Additionally consider all runs with these states for budget < current budget
imputor: epm.base_imputor Instance
Object to impute censored data
impute_state: list, optional
List of states that mark censored data (such as StatusType.TIMEOUT)
in combination with runtime < cutoff_time
If None, set to empty list [].
If None and impute_censored_data is True, raise TypeError.
scale_perc: int
scaled y-transformation use a percentile to estimate distance to optimum;
only used by some subclasses of AbstractRunHistory2EPM
rng : numpy.random.RandomState
Only used for reshuffling data after imputation.
If None, use np.random.RandomState(seed=1).
multi_objective_algorithm: Optional[MultiObjectiveAlgorithm]
Instance performing multi-objective optimization. Receives an objective cost vector as input
and returns a scalar. Is executed before transforming runhistory values.
Attributes
----------
logger
scenario
rng
num_params
success_states
impute_censored_data
impute_state
cutoff_time
imputor
instance_features
n_feats
num_params
"""
def __init__(
self,
scenario: Scenario,
num_params: int,
success_states: List[StatusType],
impute_censored_data: bool = False,
impute_state: Optional[List[StatusType]] = None,
consider_for_higher_budgets_state: Optional[List[StatusType]] = None,
imputor: Optional[BaseImputor] = None,
scale_perc: int = 5,
rng: Optional[np.random.RandomState] = None,
multi_objective_algorithm: Optional[AggregationStrategy] = None,
) -> None:
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
# General arguments
self.scenario = scenario
self.rng = rng
self.num_params = num_params
self.scale_perc = scale_perc
self.num_obj = 1 # type: int
if scenario.multi_objectives is not None: # type: ignore[attr-defined] # noqa F821
self.num_obj = len(scenario.multi_objectives) # type: ignore[attr-defined] # noqa F821
# Configuration
self.impute_censored_data = impute_censored_data
self.cutoff_time = self.scenario.cutoff # type: ignore[attr-defined] # noqa F821
self.imputor = imputor
if self.num_obj > 1:
self.multi_objective_algorithm = multi_objective_algorithm
else:
self.multi_objective_algorithm = None
# Fill with some default values
if rng is None:
self.rng = np.random.RandomState(seed=1)
if impute_state is None and impute_censored_data:
raise TypeError("impute_state not given")
if impute_state is None:
# please mypy
self.impute_state = [] # type: List[StatusType]
else:
self.impute_state = impute_state
if consider_for_higher_budgets_state is None:
# please mypy
self.consider_for_higher_budgets_state = [] # type: List[StatusType]
else:
self.consider_for_higher_budgets_state = consider_for_higher_budgets_state
if success_states is None:
raise TypeError("success_states not given")
self.success_states = success_states
self.instance_features = scenario.feature_dict
self.n_feats = scenario.n_features
self.num_params = num_params
# Sanity checks
if impute_censored_data and scenario.run_obj != "runtime":
# So far we don't know how to handle censored quality data
self.logger.critical("Cannot impute censored data when not " "optimizing runtime")
raise NotImplementedError("Cannot impute censored data when not " "optimizing runtime")
# Check imputor stuff
if impute_censored_data and self.imputor is None:
self.logger.critical("You want me to impute censored data, but " "I don't know how. Imputor is None")
raise ValueError("impute_censored data, but no imputor given")
elif impute_censored_data and not isinstance(self.imputor, BaseImputor):
raise ValueError(
"Given imputor is not an instance of " "smac.epm.base_imputor.BaseImputor, but %s" % type(self.imputor)
)
# Learned statistics
self.min_y = np.array([np.NaN] * self.num_obj)
self.max_y = np.array([np.NaN] * self.num_obj)
self.perc = np.array([np.NaN] * self.num_obj)
@abc.abstractmethod
def _build_matrix(
self,
run_dict: Mapping[RunKey, RunValue],
runhistory: RunHistory,
return_time_as_y: bool = False,
store_statistics: bool = False,
) -> Tuple[np.ndarray, np.ndarray]:
"""Builds x,y matrixes from selected runs from runhistory.
Parameters
----------
run_dict: dict(RunKey -> RunValue)
dictionary from RunHistory.RunKey to RunHistory.RunValue
runhistory: RunHistory
runhistory object
return_time_as_y: bool
Return the time instead of cost as y value. Necessary to access the raw y values for imputation.
store_statistics: bool
Whether to store statistics about the data (to be used at subsequent calls)
Returns
-------
X: np.ndarray
Y: np.ndarray
"""
raise NotImplementedError()
def _get_s_run_dict(
self,
runhistory: RunHistory,
budget_subset: Optional[List] = None,
) -> Dict[RunKey, RunValue]:
# Get only successfully finished runs
if budget_subset is not None:
if len(budget_subset) != 1:
raise ValueError("Cannot yet handle getting runs from multiple budgets")
s_run_dict = {
run: runhistory.data[run]
for run in runhistory.data.keys()
if run.budget in budget_subset and runhistory.data[run].status in self.success_states
}
# Additionally add these states from lower budgets
add = {
run: runhistory.data[run]
for run in runhistory.data.keys()
if runhistory.data[run].status in self.consider_for_higher_budgets_state
and run.budget < budget_subset[0]
}
s_run_dict.update(add)
else:
s_run_dict = {
run: runhistory.data[run]
for run in runhistory.data.keys()
if runhistory.data[run].status in self.success_states
}
return s_run_dict
def _get_t_run_dict(
self,
runhistory: RunHistory,
budget_subset: Optional[List] = None,
) -> Dict[RunKey, RunValue]:
if budget_subset is not None:
t_run_dict = {
run: runhistory.data[run]
for run in runhistory.data.keys()
if runhistory.data[run].status == StatusType.TIMEOUT
and runhistory.data[run].time >= self.cutoff_time
and run.budget in budget_subset
}
else:
t_run_dict = {
run: runhistory.data[run]
for run in runhistory.data.keys()
if runhistory.data[run].status == StatusType.TIMEOUT and runhistory.data[run].time >= self.cutoff_time
}
return t_run_dict
def get_configurations(
self,
runhistory: RunHistory,
budget_subset: Optional[List] = None,
) -> np.ndarray:
"""Returns vector representation of only the configurations. Instance features are not
appended and cost values are not taken into account.
Parameters
----------
runhistory : smac.runhistory.runhistory.RunHistory
Runhistory containing all evaluated configurations/instances
budget_subset : list of budgets to consider
Returns
-------
numpy.ndarray
"""
s_runs = self._get_s_run_dict(runhistory, budget_subset)
s_config_ids = set(s_run.config_id for s_run in s_runs)
t_runs = self._get_t_run_dict(runhistory, budget_subset)
t_config_ids = set(t_run.config_id for t_run in t_runs)
config_ids = s_config_ids | t_config_ids
configurations = [runhistory.ids_config[config_id] for config_id in config_ids]
configs_array = convert_configurations_to_array(configurations)
return configs_array
def transform(
self,
runhistory: RunHistory,
budget_subset: Optional[List] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Returns vector representation of runhistory; if imputation is disabled, censored (TIMEOUT
with time < cutoff) will be skipped.
Parameters
----------
runhistory : smac.runhistory.runhistory.RunHistory
Runhistory containing all evaluated configurations/instances
budget_subset : list of budgets to consider
Returns
-------
X: numpy.ndarray
configuration vector x instance features
Y: numpy.ndarray
cost values
"""
self.logger.debug("Transform runhistory into X,y format")
s_run_dict = self._get_s_run_dict(runhistory, budget_subset)
X, Y = self._build_matrix(run_dict=s_run_dict, runhistory=runhistory, store_statistics=True)
# Get real TIMEOUT runs
t_run_dict = self._get_t_run_dict(runhistory, budget_subset)
# use penalization (e.g. PAR10) for EPM training
store_statistics = True if np.any(np.isnan(self.min_y)) else False
tX, tY = self._build_matrix(
run_dict=t_run_dict,
runhistory=runhistory,
store_statistics=store_statistics,
)
# if we don't have successful runs,
# we have to return all timeout runs
if not s_run_dict:
return tX, tY
if self.impute_censored_data:
# Get all censored runs
if budget_subset is not None:
c_run_dict = {
run: runhistory.data[run]
for run in runhistory.data.keys()
if runhistory.data[run].status in self.impute_state
and runhistory.data[run].time < self.cutoff_time
and run.budget in budget_subset
}
else:
c_run_dict = {
run: runhistory.data[run]
for run in runhistory.data.keys()
if runhistory.data[run].status in self.impute_state and runhistory.data[run].time < self.cutoff_time
}
if len(c_run_dict) == 0:
self.logger.debug("No censored data found, skip imputation")
# If we do not impute, we also return TIMEOUT data
X = np.vstack((X, tX))
Y = np.concatenate((Y, tY))
else:
# better empirical results by using PAR1 instead of PAR10
# for censored data imputation
cen_X, cen_Y = self._build_matrix(
run_dict=c_run_dict,
runhistory=runhistory,
return_time_as_y=True,
store_statistics=False,
)
# Also impute TIMEOUTS
tX, tY = self._build_matrix(
run_dict=t_run_dict,
runhistory=runhistory,
return_time_as_y=True,
store_statistics=False,
)
self.logger.debug("%d TIMEOUTS, %d CAPPED, %d SUCC" % (tX.shape[0], cen_X.shape[0], X.shape[0]))
cen_X = np.vstack((cen_X, tX))
cen_Y = np.concatenate((cen_Y, tY))
# return imp_Y in PAR depending on the used threshold in imputor
assert isinstance(self.imputor, BaseImputor) # please mypy
imp_Y = self.imputor.impute(censored_X=cen_X, censored_y=cen_Y, uncensored_X=X, uncensored_y=Y)
# Shuffle data to mix censored and imputed data
X = np.vstack((X, cen_X))
Y = np.concatenate((Y, imp_Y)) # type: ignore
else:
# If we do not impute, we also return TIMEOUT data
X = np.vstack((X, tX))
Y = np.concatenate((Y, tY))
self.logger.debug("Converted %d observations" % (X.shape[0]))
return X, Y
@abc.abstractmethod
def transform_response_values(
self,
values: np.ndarray,
) -> np.ndarray:
"""Transform function response values.
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
raise NotImplementedError
def get_X_y(self, runhistory: RunHistory) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Simple interface to obtain all data in runhistory in X, y format.
Note: This function should not be used as it does not consider all available StatusTypes.
Parameters
----------
runhistory : smac.runhistory.runhistory.RunHistory
runhistory of all evaluated configurations x instances
Returns
-------
X: numpy.ndarray
matrix of all configurations (+ instance features)
y: numpy.ndarray
vector of cost values; can include censored runs
cen: numpy.ndarray
vector of bools indicating whether the y-value is censored
"""
self.logger.warning("This function is not tested and might not work as expected!")
X = []
y = []
cen = []
feature_dict = self.scenario.feature_dict
params = self.scenario.cs.get_hyperparameters() # type: ignore[attr-defined] # noqa F821
for k, v in runhistory.data.items():
config = runhistory.ids_config[k.config_id]
x = [config.get(p.name) for p in params]
features = feature_dict.get(k.instance_id)
if features:
x.extend(features)
X.append(x)
y.append(v.cost)
cen.append(v.status != StatusType.SUCCESS)
return np.array(X), np.array(y), np.array(cen)
[docs]class RunHistory2EPM4Cost(AbstractRunHistory2EPM):
"""TODO."""
def _build_matrix(
self,
run_dict: Mapping[RunKey, RunValue],
runhistory: RunHistory,
return_time_as_y: bool = False,
store_statistics: bool = False,
) -> Tuple[np.ndarray, np.ndarray]:
"""Builds X,y matrixes from selected runs from runhistory.
Parameters
----------
run_dict: dict: RunKey -> RunValue
dictionary from RunHistory.RunKey to RunHistory.RunValue
runhistory: RunHistory
runhistory object
return_time_as_y: bool
Return the time instead of cost as y value. Necessary to access the raw y values for imputation.
store_statistics: bool
Whether to store statistics about the data (to be used at subsequent calls)
Returns
-------
X: np.ndarray
Y: np.ndarray
"""
# First build nan-matrix of size #configs x #params+1
n_rows = len(run_dict)
n_cols = self.num_params
X = np.ones([n_rows, n_cols + self.n_feats]) * np.nan
# For now we keep it as 1
# TODO: Extend for native multi-objective
y = np.ones([n_rows, 1])
# Then populate matrix
for row, (key, run) in enumerate(run_dict.items()):
# Scaling is automatically done in configSpace
conf = runhistory.ids_config[key.config_id]
conf_vector = convert_configurations_to_array([conf])[0]
if self.n_feats:
feats = self.instance_features[key.instance_id]
X[row, :] = np.hstack((conf_vector, feats)) # type: ignore
else:
X[row, :] = conf_vector
# run_array[row, -1] = instances[row]
if self.num_obj > 1:
assert self.multi_objective_algorithm is not None
# Let's normalize y here
# We use the objective_bounds calculated by the runhistory
y_ = normalize_costs(run.cost, runhistory.objective_bounds)
y_agg = self.multi_objective_algorithm(y_)
y[row] = y_agg
else:
if return_time_as_y:
y[row, 0] = run.time
else:
y[row] = run.cost
if y.size > 0:
if store_statistics:
self.perc = np.percentile(y, self.scale_perc, axis=0)
self.min_y = np.min(y, axis=0)
self.max_y = np.max(y, axis=0)
y = self.transform_response_values(values=y)
return X, y
[docs] def transform_response_values(self, values: np.ndarray) -> np.ndarray:
"""Transform function response values. Returns the input values.
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
return values
[docs]class RunHistory2EPM4LogCost(RunHistory2EPM4Cost):
"""TODO."""
[docs] def transform_response_values(self, values: np.ndarray) -> np.ndarray:
"""Transform function response values. Transforms the response values by using a log
transformation.
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
# ensure that minimal value is larger than 0
if np.any(values <= 0):
self.logger.warning(
"Got cost of smaller/equal to 0. Replace by %f since we use"
" log cost." % constants.MINIMAL_COST_FOR_LOG
)
values[values < constants.MINIMAL_COST_FOR_LOG] = constants.MINIMAL_COST_FOR_LOG
values = np.log(values)
return values
[docs]class RunHistory2EPM4ScaledCost(RunHistory2EPM4Cost):
"""TODO."""
[docs] def transform_response_values(self, values: np.ndarray) -> np.ndarray:
"""Transform function response values. Transforms the response values by linearly scaling
them between zero and one.
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
min_y = self.min_y - (self.perc - self.min_y) # Subtract the difference between the percentile and the minimum
min_y -= constants.VERY_SMALL_NUMBER # Minimal value to avoid numerical issues in the log scaling below
# linear scaling
# prevent diving by zero
min_y[np.where(min_y == self.max_y)] *= 1 - 10**-101
values = (values - min_y) / (self.max_y - min_y)
return values
[docs]class RunHistory2EPM4InvScaledCost(RunHistory2EPM4Cost):
"""TODO."""
def __init__(self, **kwargs): # type: ignore[no-untyped-def] # noqa F723
super().__init__(**kwargs)
if self.instance_features is not None:
if len(self.instance_features) > 1:
raise NotImplementedError("Handling more than one instance is not supported for inverse scaled cost.")
[docs] def transform_response_values(self, values: np.ndarray) -> np.ndarray:
"""Transform function response values. Transform the response values by linearly scaling
them between zero and one and then using inverse scaling.
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
min_y = self.min_y - (self.perc - self.min_y) # Subtract the difference between the percentile and the minimum
min_y -= constants.VERY_SMALL_NUMBER # Minimal value to avoid numerical issues in the log scaling below
# linear scaling
# prevent diving by zero
min_y[np.where(min_y == self.max_y)] *= 1 - 10**-10
values = (values - min_y) / (self.max_y - min_y)
values = 1 - 1 / values
return values
[docs]class RunHistory2EPM4SqrtScaledCost(RunHistory2EPM4Cost):
"""TODO."""
def __init__(self, **kwargs): # type: ignore[no-untyped-def] # noqa F723
super().__init__(**kwargs)
if self.instance_features is not None:
if len(self.instance_features) > 1:
raise NotImplementedError("Handling more than one instance is not supported for sqrt scaled cost.")
[docs] def transform_response_values(self, values: np.ndarray) -> np.ndarray:
"""Transform function response values. Transform the response values by linearly scaling
them between zero and one and then using the square root.
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
min_y = self.min_y - (self.perc - self.min_y) # Subtract the difference between the percentile and the minimum
min_y -= constants.VERY_SMALL_NUMBER # Minimal value to avoid numerical issues in the log scaling below
# linear scaling
# prevent diving by zero
min_y[np.where(min_y == self.max_y)] *= 1 - 10**-10
values = (values - min_y) / (self.max_y - min_y)
values = np.sqrt(values)
return values
[docs]class RunHistory2EPM4LogScaledCost(RunHistory2EPM4Cost):
"""TODO."""
[docs] def transform_response_values(self, values: np.ndarray) -> np.ndarray:
"""Transform function response values.
Transform the response values by linearly scaling them between zero and one and
then using the log transformation.
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
min_y = self.min_y - (self.perc - self.min_y) # Subtract the difference between the percentile and the minimum
min_y -= constants.VERY_SMALL_NUMBER # Minimal value to avoid numerical issues in the log scaling below
# linear scaling
# prevent diving by zero
min_y[np.where(min_y == self.max_y)] *= 1 - 10**-10
values = (values - min_y) / (self.max_y - min_y)
values = np.log(values)
return values
[docs]class RunHistory2EPM4EIPS(AbstractRunHistory2EPM):
"""TODO."""
def _build_matrix(
self,
run_dict: Mapping[RunKey, RunValue],
runhistory: RunHistory,
return_time_as_y: bool = False,
store_statistics: bool = False,
) -> Tuple[np.ndarray, np.ndarray]:
"""TODO."""
if return_time_as_y:
raise NotImplementedError()
if store_statistics:
# store_statistics is currently not necessary
pass
# First build nan-matrix of size #configs x #params+1
n_rows = len(run_dict)
n_cols = self.num_params
X = np.ones([n_rows, n_cols + self.n_feats]) * np.nan
y = np.ones([n_rows, 2])
# Then populate matrix
for row, (key, run) in enumerate(run_dict.items()):
# Scaling is automatically done in configSpace
conf = runhistory.ids_config[key.config_id]
conf_vector = convert_configurations_to_array([conf])[0]
if self.n_feats:
feats = self.instance_features[key.instance_id]
X[row, :] = np.hstack((conf_vector, feats)) # type: ignore
else:
X[row, :] = conf_vector
if self.num_obj > 1:
assert self.multi_objective_algorithm is not None
# Let's normalize y here
# We use the objective_bounds calculated by the runhistory
y_ = normalize_costs(run.cost, runhistory.objective_bounds)
y_agg = self.multi_objective_algorithm(y_)
y[row, 0] = y_agg
else:
y[row, 0] = run.cost
y[row, 1] = run.time
y_transformed = self.transform_response_values(values=y)
return X, y_transformed
[docs] def transform_response_values(self, values: np.ndarray) -> np.ndarray:
"""Transform function response values. Transform the runtimes by a log transformation
(log(1.
+ runtime).
Parameters
----------
values : np.ndarray
Response values to be transformed.
Returns
-------
np.ndarray
"""
# We need to ensure that time remains positive after the log transform.
values[:, 1] = np.log(1 + values[:, 1])
return values