from __future__ import annotations
from typing import Any, Iterator
import copy
import numpy as np
from ConfigSpace import Configuration
from smac.main.base_smbo import BaseSMBO
from smac.runhistory import StatusType, TrialInfo, TrialValue
from smac.runhistory.enumerations import TrialInfoIntent
from smac.runner.exceptions import (
FirstRunCrashedException,
TargetAlgorithmAbortException,
)
from smac.utils.configspace import convert_configurations_to_array
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]class SMBO(BaseSMBO):
"""Implements ``get_next_configurations``, ``ask``, and ``tell``."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._predict_x_best = True
self._min_samples = 1
self._considered_budgets: list[float | None] = [None]
[docs] def get_next_configurations(self, n: int | None = None) -> Iterator[Configuration]: # noqa: D102
for callback in self._callbacks:
callback.on_next_configurations_start(self)
# Cost value of incumbent configuration (required for acquisition function).
# If not given, it will be inferred from runhistory or predicted.
# If not given and runhistory is empty, it will raise a ValueError.
incumbent_value: float | None = None
logger.debug("Search for next configuration...")
X, Y, X_configurations = self._collect_data()
previous_configs = self._runhistory.get_configs()
if X.shape[0] == 0:
# Only return a single point to avoid an overly high number of random search iterations.
# We got rid of random search here and replaced it with a simple configuration sampling from
# the configspace.
return iter([self._scenario.configspace.sample_configuration(1)])
self._model.train(X, Y)
x_best_array: np.ndarray | None = None
if incumbent_value is not None:
best_observation = incumbent_value
else:
if self._runhistory.empty():
raise ValueError("Runhistory is empty and the cost value of the incumbent is unknown.")
x_best_array, best_observation = self._get_x_best(self._predict_x_best, X_configurations)
self._acquisition_function.update(
model=self._model,
eta=best_observation,
incumbent_array=x_best_array,
num_data=len(self._get_evaluated_configs()),
X=X_configurations,
)
challengers = self._acquisition_maximizer.maximize(
previous_configs,
n_points=n,
random_design=self._random_design,
)
for callback in self._callbacks:
challenger_list = list(copy.deepcopy(challengers))
callback.on_next_configurations_end(self, challenger_list)
return challengers
[docs] def ask(self) -> tuple[TrialInfoIntent, TrialInfo]: # noqa: D102
for callback in self._callbacks:
callback.on_ask_start(self)
intent, trial_info = self._intensifier.get_next_trial(
challengers=self._initial_design_configs,
incumbent=self._incumbent,
get_next_configurations=self.get_next_configurations,
runhistory=self._runhistory,
repeat_configs=self._intensifier.repeat_configs,
n_workers=self._runner.count_available_workers(),
)
if intent == TrialInfoIntent.RUN:
# There are 2 criteria that the stats object uses to know if the budged was exhausted.
# The budget time, which can only be known when the run finishes,
# And the number of ta executions. Because we submit the job at this point,
# we count this submission as a run. This prevent for using more
# runner runs than what the config allows.
self._stats._submitted += 1
# Remove config from initial design challengers to not repeat it again
self._initial_design_configs = [c for c in self._initial_design_configs if c != trial_info.config]
for callback in self._callbacks:
callback.on_ask_end(self, intent, trial_info)
return intent, trial_info
[docs] def tell(
self,
info: TrialInfo,
value: TrialValue,
time_left: float | None = None,
save: bool = True,
) -> None: # noqa: D102
# We first check if budget/instance/seed is supported by the intensifier
if info.seed not in (seeds := self._intensifier.get_target_function_seeds()):
raise ValueError(f"Seed {info.seed} is not supported by the intensifier. Consider using one of {seeds}.")
elif info.budget not in (budgets := self._intensifier.get_target_function_budgets()):
raise ValueError(
f"Budget {info.budget} is not supported by the intensifier. Consider using one of {budgets}."
)
elif info.instance not in (instances := self._intensifier.get_target_function_instances()):
raise ValueError(
f"Instance {info.instance} is not supported by the intensifier. Consider using one of {instances}."
)
if info.config.origin is None:
info.config.origin = "Custom"
for callback in self._callbacks:
response = callback.on_tell_start(self, info, value)
# If a callback returns False, the optimization loop should be interrupted
# the other callbacks are still being called.
if response is False:
logger.info("An callback returned False. Abort is requested.")
self._stop = True
# We expect the first run to always succeed.
if self._stats.finished == 0 and value.status == StatusType.CRASHED:
additional_info = ""
if "traceback" in value.additional_info:
additional_info = "\n\n" + value.additional_info["traceback"]
raise FirstRunCrashedException("The first run crashed. Please check your setup again." + additional_info)
# Update SMAC stats
self._stats._target_function_walltime_used += float(value.time)
self._stats._finished += 1
logger.debug(
f"Status: {value.status}, cost: {value.cost}, time: {value.time}, " f"Additional: {value.additional_info}"
)
self._runhistory.add(
config=info.config,
cost=value.cost,
time=value.time,
status=value.status,
instance=info.instance,
seed=info.seed,
budget=info.budget,
starttime=value.starttime,
endtime=value.endtime,
force_update=True,
additional_info=value.additional_info,
)
self._stats._n_configs = len(self._runhistory._config_ids)
if value.status == StatusType.ABORT:
raise TargetAlgorithmAbortException(
"The target function was aborted. The last incumbent can be found in the trajectory file."
)
elif value.status == StatusType.STOP:
logger.debug("Value holds the status stop. Abort is requested.")
self._stop = True
if time_left is None:
time_left = np.inf
# Update the intensifier with the result of the runs
self._incumbent, _ = self._intensifier.process_results(
trial_info=info,
trial_value=value,
incumbent=self._incumbent,
runhistory=self._runhistory,
time_bound=max(self._min_time, time_left),
)
# Gracefully end optimization if termination cost is reached
if self._scenario.termination_cost_threshold != np.inf:
cost = self.runhistory.average_cost(info.config)
if not isinstance(cost, list):
cost = [cost]
if not isinstance(self._scenario.termination_cost_threshold, list):
cost_threshold = [self._scenario.termination_cost_threshold]
else:
cost_threshold = self._scenario.termination_cost_threshold
if len(cost) != len(cost_threshold):
raise RuntimeError("You must specify a termination cost threshold for each objective.")
if all(cost[i] < cost_threshold[i] for i in range(len(cost))):
logger.info("Cost threshold was reached. Abort is requested.")
self._stop = True
for callback in self._callbacks:
response = callback.on_tell_end(self, info, value)
# If a callback returns False, the optimization loop should be interrupted
# the other callbacks are still being called.
if response is False:
logger.info("An callback returned False. Abort is requested.")
self._stop = True
if save:
self.save()
def _collect_data(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Collects the data from the runhistory to train the surrogate model. The data collection strategy if budgets
are used is as follows: Looking from highest to lowest budget, return those observations
that support at least ``self._min_samples`` points.
If no budgets are used, this is equivalent to returning all observations.
"""
# if we use a float value as a budget, we want to train the model only on the highest budget
available_budgets = []
for run_key in self._runhistory:
available_budgets.append(run_key.budget)
# Sort available budgets from highest to lowest budget
available_budgets = sorted(list(set(available_budgets)), reverse=True) # type: ignore
# Get #points per budget and if there are enough samples, then build a model
for b in available_budgets:
X, Y = self._runhistory_encoder.transform(self._runhistory, budget_subset=[b])
if X.shape[0] >= self._min_samples:
self._considered_budgets = [b]
configs_array = self._runhistory_encoder.get_configurations(
self._runhistory, budget_subset=self._considered_budgets
)
return X, Y, configs_array
return (
np.empty(shape=[0, 0]),
np.empty(
shape=[
0,
]
),
np.empty(shape=[0, 0]),
)
def _get_evaluated_configs(self) -> list[Configuration]:
return self._runhistory.get_configs_per_budget(budget_subset=self._considered_budgets)
def _get_x_best(self, predict: bool, X: np.ndarray) -> tuple[np.ndarray, float]:
"""Get value, configuration, and array representation of the *best* configuration.
The definition of best varies depending on the argument ``predict``. If set to `True`,
this function will return the stats of the best configuration as predicted by the model,
otherwise it will return the stats for the best observed configuration.
Parameters
----------
predict : bool
Whether to use the predicted or observed best.
Returns
-------
float
np.ndarry
Configuration
"""
if predict:
model = self._model
costs = list(
map(
lambda x: (
model.predict_marginalized(x.reshape((1, -1)))[0][0][0], # type: ignore
x,
),
X,
)
)
costs = sorted(costs, key=lambda t: t[0])
x_best_array = costs[0][1]
best_observation = costs[0][0]
# won't need log(y) if EPM was already trained on log(y)
else:
all_configs = self._runhistory.get_configs_per_budget(budget_subset=self._considered_budgets)
x_best = self._incumbent
x_best_array = convert_configurations_to_array(all_configs)
best_observation = self._runhistory.get_cost(x_best)
best_observation_as_array = np.array(best_observation).reshape((1, 1))
# It's unclear how to do this for inv scaling and potential future scaling.
# This line should be changed if necessary
best_observation = self._runhistory_encoder.transform_response_values(best_observation_as_array)
best_observation = best_observation[0][0]
return x_best_array, best_observation