from __future__ import annotations
from abc import abstractmethod
from typing import Any, Callable
from pathlib import Path
import joblib
from ConfigSpace import Configuration
from dask.distributed import Client
from typing_extensions import Literal
import smac
from smac.acquisition.function.abstract_acquisition_function import (
AbstractAcquisitionFunction,
)
from smac.acquisition.maximizer.abstract_acqusition_maximizer import (
AbstractAcquisitionMaximizer,
)
from smac.callback.callback import Callback
from smac.initial_design.abstract_initial_design import AbstractInitialDesign
from smac.intensifier.abstract_intensifier import AbstractIntensifier
from smac.main.config_selector import ConfigSelector
from smac.main.smbo import SMBO
from smac.model.abstract_model import AbstractModel
from smac.multi_objective.abstract_multi_objective_algorithm import (
AbstractMultiObjectiveAlgorithm,
)
from smac.random_design.abstract_random_design import AbstractRandomDesign
from smac.runhistory.dataclasses import TrialInfo, TrialValue
from smac.runhistory.encoder.abstract_encoder import AbstractRunHistoryEncoder
from smac.runhistory.runhistory import RunHistory
from smac.runner.abstract_runner import AbstractRunner
from smac.runner.dask_runner import DaskParallelRunner
from smac.runner.target_function_runner import TargetFunctionRunner
from smac.runner.target_function_script_runner import TargetFunctionScriptRunner
from smac.scenario import Scenario
from smac.utils.logging import get_logger, setup_logging
logger = get_logger(__name__)
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
[docs]
class AbstractFacade:
"""Facade is an abstraction on top of the SMBO backend to organize the components of a Bayesian Optimization loop
in a configurable and separable manner to suit the various needs of different (hyperparameter) optimization
pipelines.
With the exception to scenario and ``target_function``, which are expected of the user, the parameters ``model``,
``acquisition_function``, ``acquisition_maximizer``, ``initial_design``, ``random_design``, ``intensifier``,
``multi_objective_algorithm``, ``runhistory_encoder`` can either be explicitly specified in the subclasses'
``get_*`` methods (defining a specific BO pipeline) or be instantiated by the user to overwrite a pipeline
components explicitly.
Parameters
----------
scenario : Scenario
The scenario object, holding all environmental information.
target_function : Callable | str | AbstractRunner
This function is called internally to judge a trial's performance. If a string is passed,
it is assumed to be a script. In this case, ``TargetFunctionScriptRunner`` is used to run the script.
model : AbstractModel | None, defaults to None
The surrogate model.
acquisition_function : AbstractAcquisitionFunction | None, defaults to None
The acquisition function.
acquisition_maximizer : AbstractAcquisitionMaximizer | None, defaults to None
The acquisition maximizer, deciding which configuration is most promising based on the surrogate model and
acquisition function.
initial_design : InitialDesign | None, defaults to None
The sampled configurations from the initial design are evaluated before the Bayesian optimization loop starts.
random_design : RandomDesign | None, defaults to None
The random design is used in the acquisition maximizer, deciding whether the next configuration should be drawn
from the acquisition function or randomly.
intensifier : AbstractIntensifier | None, defaults to None
The intensifier decides which trial (combination of configuration, seed, budget and instance) should be run
next.
multi_objective_algorithm : AbstractMultiObjectiveAlgorithm | None, defaults to None
In case of multiple objectives, the objectives need to be interpreted so that an optimization is possible.
The multi-objective algorithm takes care of that.
runhistory_encoder : RunHistoryEncoder | None, defaults to None
Based on the runhistory, the surrogate model is trained. However, the data first needs to be encoded, which
is done by the runhistory encoder. For example, inactive hyperparameters need to be encoded or cost values
can be log transformed.
logging_level: int | Path | Literal[False] | None
The level of logging (the lowest level 0 indicates the debug level). If a path is passed, a yaml file is
expected with the logging configuration. If nothing is passed, the default logging.yml from SMAC is used.
If False is passed, SMAC will not do any customization of the logging setup and the responsibility is left
to the user.
callbacks: list[Callback], defaults to []
Callbacks, which are incorporated into the optimization loop.
overwrite: bool, defaults to False
When True, overwrites the run results if a previous run is found that is
consistent in the meta data with the current setup. When False and a previous run is found that is
consistent in the meta data, the run is continued. When False and a previous run is found that is
not consistent in the meta data, the the user is asked for the exact behaviour (overwrite completely
or rename old run first).
dask_client: Client | None, defaults to None
User-created dask client, which can be used to start a dask cluster and then attach SMAC to it. This will not
be closed automatically and will have to be closed manually if provided explicitly. If none is provided
(default), a local one will be created for you and closed upon completion.
"""
def __init__(
self,
scenario: Scenario,
target_function: Callable | str | AbstractRunner,
*,
model: AbstractModel | None = None,
acquisition_function: AbstractAcquisitionFunction | None = None,
acquisition_maximizer: AbstractAcquisitionMaximizer | None = None,
initial_design: AbstractInitialDesign | None = None,
random_design: AbstractRandomDesign | None = None,
intensifier: AbstractIntensifier | None = None,
multi_objective_algorithm: AbstractMultiObjectiveAlgorithm | None = None,
runhistory_encoder: AbstractRunHistoryEncoder | None = None,
config_selector: ConfigSelector | None = None,
logging_level: int | Path | Literal[False] | None = None,
callbacks: list[Callback] = None,
overwrite: bool = False,
dask_client: Client | None = None,
):
setup_logging(logging_level)
if callbacks is None:
callbacks = []
if model is None:
model = self.get_model(scenario)
if acquisition_function is None:
acquisition_function = self.get_acquisition_function(scenario)
if acquisition_maximizer is None:
acquisition_maximizer = self.get_acquisition_maximizer(scenario)
if initial_design is None:
initial_design = self.get_initial_design(scenario)
if random_design is None:
random_design = self.get_random_design(scenario)
if intensifier is None:
intensifier = self.get_intensifier(scenario)
if multi_objective_algorithm is None and scenario.count_objectives() > 1:
multi_objective_algorithm = self.get_multi_objective_algorithm(scenario=scenario)
if runhistory_encoder is None:
runhistory_encoder = self.get_runhistory_encoder(scenario)
if config_selector is None:
config_selector = self.get_config_selector(scenario)
# Initialize empty stats and runhistory object
runhistory = RunHistory(multi_objective_algorithm=multi_objective_algorithm)
# Set the seed for configuration space
scenario.configspace.seed(scenario.seed)
# Set variables globally
self._scenario = scenario
self._model = model
self._acquisition_function = acquisition_function
self._acquisition_maximizer = acquisition_maximizer
self._initial_design = initial_design
self._random_design = random_design
self._intensifier = intensifier
self._multi_objective_algorithm = multi_objective_algorithm
self._runhistory = runhistory
self._runhistory_encoder = runhistory_encoder
self._config_selector = config_selector
self._callbacks = callbacks
self._overwrite = overwrite
# Prepare the algorithm executer
runner: AbstractRunner
if isinstance(target_function, AbstractRunner):
runner = target_function
elif isinstance(target_function, str):
runner = TargetFunctionScriptRunner(
scenario=scenario,
target_function=target_function,
required_arguments=self._get_signature_arguments(),
)
else:
runner = TargetFunctionRunner(
scenario=scenario,
target_function=target_function,
required_arguments=self._get_signature_arguments(),
)
# In case of multiple jobs, we need to wrap the runner again using DaskParallelRunner
if (n_workers := scenario.n_workers) > 1 or dask_client is not None:
if dask_client is not None and n_workers > 1:
logger.warning(
"Provided `dask_client`. Ignore `scenario.n_workers`, directly set `n_workers` in `dask_client`."
)
else:
available_workers = joblib.cpu_count()
if n_workers > available_workers:
logger.info(f"Workers are reduced to {n_workers}.")
n_workers = available_workers
# We use a dask runner for parallelization
runner = DaskParallelRunner(single_worker=runner, dask_client=dask_client)
# Set the runner to access it globally
self._runner = runner
# Adding dependencies of the components
self._update_dependencies()
# We have to update our meta data (basically arguments of the components)
self._scenario._set_meta(self.meta)
# We have to validate if the object compositions are correct and actually make sense
self._validate()
# Finally we configure our optimizer
self._optimizer = self._get_optimizer()
assert self._optimizer
# Register callbacks here
for callback in callbacks:
self._optimizer.register_callback(callback)
# Additionally, we register the runhistory callback from the intensifier to efficiently update our incumbent
# every time new information are available
self._optimizer.register_callback(self._intensifier.get_callback(), index=0)
@property
def scenario(self) -> Scenario:
"""The scenario object which holds all environment information."""
return self._scenario
@property
def runhistory(self) -> RunHistory:
"""The runhistory which is filled with all trials during the optimization process."""
return self._optimizer._runhistory
@property
def optimizer(self) -> SMBO:
"""The optimizer which is responsible for the BO loop. Keeps track of useful information like status."""
return self._optimizer
@property
def intensifier(self) -> AbstractIntensifier:
"""The optimizer which is responsible for the BO loop. Keeps track of useful information like status."""
return self._intensifier
@property
def meta(self) -> dict[str, Any]:
"""Generates a hash based on all components of the facade. This is used for the run name or to determine
whether a run should be continued or not.
"""
multi_objective_algorithm_meta = None
if self._multi_objective_algorithm is not None:
multi_objective_algorithm_meta = self._multi_objective_algorithm.meta
meta = {
"facade": {"name": self.__class__.__name__},
"runner": self._runner.meta,
"model": self._model.meta,
"acquisition_maximizer": self._acquisition_maximizer.meta,
"acquisition_function": self._acquisition_function.meta,
"intensifier": self._intensifier.meta,
"initial_design": self._initial_design.meta,
"random_design": self._random_design.meta,
"runhistory_encoder": self._runhistory_encoder.meta,
"multi_objective_algorithm": multi_objective_algorithm_meta,
"config_selector": self._config_selector.meta,
"version": smac.version,
}
return meta
[docs]
def ask(self) -> TrialInfo:
"""Asks the intensifier for the next trial."""
return self._optimizer.ask()
[docs]
def tell(self, info: TrialInfo, value: TrialValue, save: bool = True) -> None:
"""Adds the result of a trial to the runhistory and updates the intensifier.
Parameters
----------
info: TrialInfo
Describes the trial from which to process the results.
value: TrialValue
Contains relevant information regarding the execution of a trial.
save : bool, optional to True
Whether the runhistory should be saved.
"""
return self._optimizer.tell(info, value, save=save)
[docs]
def optimize(self, *, data_to_scatter: dict[str, Any] | None = None) -> Configuration | list[Configuration]:
"""
Optimizes the configuration of the algorithm.
Parameters
----------
data_to_scatter: dict[str, Any] | None
We first note that this argument is valid only dask_runner!
When a user scatters data from their local process to the distributed network,
this data is distributed in a round-robin fashion grouping by number of cores.
Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
every time we would like to execute a target function with a big dataset.
For example, when your target function has a big dataset shared across all the target function,
this argument is very useful.
Returns
-------
incumbent : Configuration
Best found configuration.
"""
incumbents = None
if isinstance(data_to_scatter, dict) and len(data_to_scatter) == 0:
raise ValueError("data_to_scatter must be None or dict with some elements, but got an empty dict.")
try:
incumbents = self._optimizer.optimize(data_to_scatter=data_to_scatter)
finally:
self._optimizer.save()
return incumbents
[docs]
def validate(
self,
config: Configuration,
*,
seed: int | None = None,
) -> float | list[float]:
"""Validates a configuration on seeds different from the ones used in the optimization process and on the
highest budget (if budget type is real-valued).
Parameters
----------
config : Configuration
Configuration to validate
instances : list[str] | None, defaults to None
Which instances to validate. If None, all instances specified in the scenario are used.
In case that the budget type is real-valued, this argument is ignored.
seed : int | None, defaults to None
If None, the seed from the scenario is used.
Returns
-------
cost : float | list[float]
The averaged cost of the configuration. In case of multi-fidelity, the cost of each objective is
averaged.
"""
return self._optimizer.validate(config, seed=seed)
[docs]
@staticmethod
@abstractmethod
def get_model(scenario: Scenario) -> AbstractModel:
"""Returns the surrogate cost model instance used in the BO loop."""
raise NotImplementedError
[docs]
@staticmethod
@abstractmethod
def get_acquisition_function(scenario: Scenario) -> AbstractAcquisitionFunction:
"""Returns the acquisition function instance used in the BO loop,
defining the exploration/exploitation trade-off.
"""
raise NotImplementedError
[docs]
@staticmethod
@abstractmethod
def get_acquisition_maximizer(scenario: Scenario) -> AbstractAcquisitionMaximizer:
"""Returns the acquisition optimizer instance to be used in the BO loop,
specifying how the acquisition function instance is optimized.
"""
raise NotImplementedError
[docs]
@staticmethod
@abstractmethod
def get_intensifier(scenario: Scenario) -> AbstractIntensifier:
"""Returns the intensifier instance to be used in the BO loop,
specifying how to challenge the incumbent configuration on other problem instances.
"""
raise NotImplementedError
[docs]
@staticmethod
@abstractmethod
def get_initial_design(scenario: Scenario) -> AbstractInitialDesign:
"""Returns an instance of the initial design class to be used in the BO loop,
specifying how the configurations the BO loop is 'warm-started' with are selected.
"""
raise NotImplementedError
[docs]
@staticmethod
@abstractmethod
def get_random_design(scenario: Scenario) -> AbstractRandomDesign:
"""Returns an instance of the random design class to be used in the BO loop,
specifying how to interleave the BO iterations with randomly selected configurations.
"""
raise NotImplementedError
[docs]
@staticmethod
@abstractmethod
def get_runhistory_encoder(scenario: Scenario) -> AbstractRunHistoryEncoder:
"""Returns an instance of the runhistory encoder class to be used in the BO loop,
specifying how the runhistory is to be prepared for the next surrogate model.
"""
raise NotImplementedError
[docs]
@staticmethod
@abstractmethod
def get_multi_objective_algorithm(scenario: Scenario) -> AbstractMultiObjectiveAlgorithm:
"""Returns the multi-objective algorithm instance to be used in the BO loop,
specifying the scalarization strategy for multiple objectives' costs.
"""
raise NotImplementedError
[docs]
@staticmethod
def get_config_selector(
scenario: Scenario,
*,
retrain_after: int = 8,
retries: int = 16,
) -> ConfigSelector:
"""Returns the default configuration selector."""
return ConfigSelector(scenario, retrain_after=retrain_after, retries=retries)
def _get_optimizer(self) -> SMBO:
"""Fills the SMBO with all the pre-initialized components."""
return SMBO(
scenario=self._scenario,
runner=self._runner,
runhistory=self._runhistory,
intensifier=self._intensifier,
overwrite=self._overwrite,
)
def _update_dependencies(self) -> None:
"""Convenience method to add some more dependencies. And ensure separable instantiation of
the components. This is the easiest way to incorporate dependencies, although
it might be a bit hacky.
"""
# Set components to config selector
self._config_selector._set_components(
initial_design=self._initial_design,
runhistory=self._runhistory,
runhistory_encoder=self._runhistory_encoder,
model=self._model,
acquisition_function=self._acquisition_function,
acquisition_maximizer=self._acquisition_maximizer,
random_design=self._random_design,
callbacks=self._callbacks,
)
self._runhistory_encoder.multi_objective_algorithm = self._multi_objective_algorithm
self._runhistory_encoder.runhistory = self._runhistory
self._acquisition_function.model = self._model
self._acquisition_maximizer.acquisition_function = self._acquisition_function
self._intensifier.config_selector = self._config_selector
self._intensifier.runhistory = self._runhistory
def _validate(self) -> None:
"""Checks if the composition is correct if there are dependencies."""
# Make sure the same acquisition function is used
assert self._acquisition_function == self._acquisition_maximizer._acquisition_function
if isinstance(self._runner, DaskParallelRunner) and (
self.scenario.trial_walltime_limit is not None or self.scenario.trial_memory_limit is not None
):
# This is probably due to pickling dask jobs
raise ValueError(
"Parallelization via Dask cannot be used in combination with limiting "
"the resources "
"of the target function via `scenario.trial_walltime_limit` or "
"`scenario.trial_memory_limit`. Set those to `None` if you want "
"parallelization. "
)
def _get_signature_arguments(self) -> list[str]:
"""Returns signature arguments, which are required by the intensifier."""
arguments = []
if self._intensifier.uses_seeds:
arguments += ["seed"]
if self._intensifier.uses_budgets:
arguments += ["budget"]
if self._intensifier.uses_instances:
arguments += ["instance"]
return arguments