Source code for smac.main.smbo

from __future__ import annotations

from typing import Any

import json
import time
from pathlib import Path

import numpy as np
from ConfigSpace import Configuration
from numpy import ndarray

from smac.acquisition.function.abstract_acquisition_function import (
    AbstractAcquisitionFunction,
)
from smac.callback.callback import Callback
from smac.intensifier.abstract_intensifier import AbstractIntensifier
from smac.model.abstract_model import AbstractModel
from smac.runhistory import StatusType, TrialInfo, TrialValue
from smac.runhistory.runhistory import RunHistory
from smac.runner import FirstRunCrashedException
from smac.runner.abstract_runner import AbstractRunner
from smac.runner.dask_runner import DaskParallelRunner
from smac.scenario import Scenario
from smac.utils.data_structures import recursively_compare_dicts
from smac.utils.logging import get_logger

__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"


logger = get_logger(__name__)


[docs]class SMBO: """Implementation that contains the main Bayesian optimization loop. Parameters ---------- scenario : Scenario The scenario object, holding all environmental information. runner : AbstractRunner The runner (containing the target function) is called internally to judge a trial's performance. runhistory : Runhistory The runhistory stores all trials. intensifier : AbstractIntensifier The intensifier decides which trial (combination of configuration, seed, budget and instance) should be run next. overwrite: bool, defaults to False When True, overwrites the run results if a previous run is found that is inconsistent in the meta data with the current setup. If ``overwrite`` is set to False, the user is asked for the exact behaviour (overwrite completely, save old run, or use old results). Warning ------- This model should be initialized by a facade only. """ def __init__( self, scenario: Scenario, runner: AbstractRunner, runhistory: RunHistory, intensifier: AbstractIntensifier, overwrite: bool = False, ): self._scenario = scenario self._configspace = scenario.configspace self._runhistory = runhistory self._intensifier = intensifier self._trial_generator = iter(intensifier) self._runner = runner self._overwrite = overwrite # Internal variables self._finished = False self._stop = False # Gracefully stop SMAC self._callbacks: list[Callback] = [] # Stats variables self._start_time: float | None = None self._used_target_function_walltime = 0.0 # Set walltime used method for intensifier self._intensifier.used_walltime = lambda: self.used_walltime # type: ignore # We initialize the state based on previous data. # If no previous data is found then we take care of the initial design. self._initialize_state() @property def runhistory(self) -> RunHistory: """The run history, which is filled with all information during the optimization process.""" return self._runhistory @property def intensifier(self) -> AbstractIntensifier: """The run history, which is filled with all information during the optimization process.""" return self._intensifier @property def remaining_walltime(self) -> float: """Subtracts the runtime configuration budget with the used wallclock time.""" assert self._start_time is not None return self._scenario.walltime_limit - (time.time() - self._start_time) @property def remaining_cputime(self) -> float: """Subtracts the target function running budget with the used time.""" return self._scenario.cputime_limit - self._used_target_function_walltime @property def remaining_trials(self) -> int: """Subtract the target function runs in the scenario with the used ta runs.""" return self._scenario.n_trials - self.runhistory.submitted @property def budget_exhausted(self) -> bool: """Checks whether the the remaining walltime, cputime or trials was exceeded.""" A = self.remaining_walltime <= 0 B = self.remaining_cputime <= 0 C = self.remaining_trials <= 0 return A or B or C @property def used_walltime(self) -> float: """Returns used wallclock time.""" if self._start_time is None: return 0.0 return time.time() - self._start_time @property def used_target_function_walltime(self) -> float: """Returns how much walltime the target function spend so far.""" return self._used_target_function_walltime
[docs] def ask(self) -> TrialInfo: """Asks the intensifier for the next trial. Returns ------- info : TrialInfo Information about the trial (config, instance, seed, budget). """ logger.debug("Calling ask...") for callback in self._callbacks: callback.on_ask_start(self) # Now we use our generator to get the next trial info trial_info = next(self._trial_generator) # Track the fact that the trial was returned # This is really important because otherwise the intensifier would most likly sample the same trial again self._runhistory.add_running_trial(trial_info) for callback in self._callbacks: callback.on_ask_end(self, trial_info) logger.debug("...and received a new trial.") return trial_info
[docs] def tell( self, info: TrialInfo, value: TrialValue, save: bool = True, ) -> None: """Adds the result of a trial to the runhistory and updates the stats object. Parameters ---------- info : TrialInfo Describes the trial from which to process the results. value : TrialValue Contains relevant information regarding the execution of a trial. save : bool, optional to True Whether the runhistory should be saved. """ if info.config.origin is None: info.config.origin = "Custom" for callback in self._callbacks: response = callback.on_tell_start(self, info, value) # If a callback returns False, the optimization loop should be interrupted # the other callbacks are still being called. if response is False: logger.info("A callback returned False. Abort is requested.") self._stop = True # Some sanity checks here if self._intensifier.uses_instances and info.instance is None: raise ValueError("Passed instance is None but intensifier requires instances.") if self._intensifier.uses_budgets and info.budget is None: raise ValueError("Passed budget is None but intensifier requires budgets.") self._runhistory.add( config=info.config, cost=value.cost, time=value.time, status=value.status, instance=info.instance, seed=info.seed, budget=info.budget, starttime=value.starttime, endtime=value.endtime, additional_info=value.additional_info, force_update=True, # Important to overwrite the status RUNNING ) logger.debug(f"Tell method was called with cost {value.cost} ({StatusType(value.status).name}).") for callback in self._callbacks: response = callback.on_tell_end(self, info, value) # If a callback returns False, the optimization loop should be interrupted # the other callbacks are still being called. if response is False: logger.info("A callback returned False. Abort is requested.") self._stop = True if save: self.save()
[docs] def update_model(self, model: AbstractModel) -> None: """Updates the model and updates the acquisition function.""" if (config_selector := self._intensifier._config_selector) is not None: config_selector._model = model assert config_selector._acquisition_function is not None config_selector._acquisition_function.model = model
[docs] def update_acquisition_function(self, acquisition_function: AbstractAcquisitionFunction) -> None: """Updates the acquisition function including the associated model and the acquisition optimizer. """ if (config_selector := self._intensifier._config_selector) is not None: config_selector._acquisition_function = acquisition_function config_selector._acquisition_function.model = config_selector._model assert config_selector._acquisition_maximizer is not None config_selector._acquisition_maximizer.acquisition_function = acquisition_function
[docs] def optimize(self, *, data_to_scatter: dict[str, Any] | None = None) -> Configuration | list[Configuration]: """Runs the Bayesian optimization loop. Parameters ---------- data_to_scatter: dict[str, Any] | None When a user scatters data from their local process to the distributed network, this data is distributed in a round-robin fashion grouping by number of cores. Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data every time we would like to execute a target function with a big dataset. For example, when your target function has a big dataset shared across all the target function, this argument is very useful. Returns ------- incumbent : Configuration The best found configuration. """ # We return the incumbent if we already finished the a process (we don't want to allow to call # optimize more than once). if self._finished: logger.info("Optimization process was already finished. Returning incumbent...") if self._scenario.count_objectives() == 1: return self.intensifier.get_incumbent() else: return self.intensifier.get_incumbents() # Start the timer before we do anything # If we continue the optimization, the starting time is set by the load method if self._start_time is None: self._start_time = time.time() for callback in self._callbacks: callback.on_start(self) dask_data_to_scatter = {} if isinstance(self._runner, DaskParallelRunner) and data_to_scatter is not None: dask_data_to_scatter = dict(data_to_scatter=self._runner._client.scatter(data_to_scatter, broadcast=True)) elif data_to_scatter is not None: raise ValueError( "data_to_scatter is valid only for DaskParallelRunner, " f"but {dask_data_to_scatter} was provided for {self._runner.__class__.__name__}" ) # Main BO loop while True: for callback in self._callbacks: callback.on_iteration_start(self) try: # Sample next trial from the intensification trial_info = self.ask() # We submit the trial to the runner # In multi-worker mode, SMAC waits till a new worker is available here self._runner.submit_trial(trial_info=trial_info, **dask_data_to_scatter) except StopIteration: self._stop = True # We add results from the runner if results are available self._add_results() # Some statistics logger.debug( f"Remaining wallclock time: {self.remaining_walltime}; " f"Remaining cpu time: {self.remaining_cputime}; " f"Remaining trials: {self.remaining_trials}" ) if self.runhistory.finished % 50 == 0: logger.info(f"Finished {self.runhistory.finished} trials.") for callback in self._callbacks: callback.on_iteration_end(self) # Now we check whether we have to stop the optimization if self.budget_exhausted or self._stop: if self.budget_exhausted: logger.info("Configuration budget is exhausted:") logger.info(f"--- Remaining wallclock time: {self.remaining_walltime}") logger.info(f"--- Remaining cpu time: {self.remaining_cputime}") logger.info(f"--- Remaining trials: {self.remaining_trials}") else: logger.info("Shutting down because the stop flag was set.") # Wait for the trials to finish while self._runner.is_running(): self._runner.wait() self._add_results() # Break from the intensification loop, as there are no more resources break for callback in self._callbacks: callback.on_end(self) # We only set the finished flag if the budget really was exhausted if self.budget_exhausted: self._finished = True if self._scenario.count_objectives() == 1: return self.intensifier.get_incumbent() else: return self.intensifier.get_incumbents()
[docs] def reset(self) -> None: """Resets the internal variables of the optimizer, intensifier, and runhistory.""" self._used_target_function_walltime = 0 self._finished = False # We also reset runhistory and intensifier here self._runhistory.reset() self._intensifier.reset()
[docs] def exists(self, filename: str | Path) -> bool: """Checks if the files associated with the run already exist. Checks all files that are created by the optimizer. Parameters ---------- filename : str | Path The name of the folder of the SMAC run. """ if isinstance(filename, str): filename = Path(filename) optimization_fn = filename / "optimization.json" runhistory_fn = filename / "runhistory.json" intensifier_fn = filename / "intensifier.json" if optimization_fn.exists() and runhistory_fn.exists() and intensifier_fn.exists(): return True return False
[docs] def load(self) -> None: """Loads the optimizer, intensifier, and runhistory from the output directory specified in the scenario.""" filename = self._scenario.output_directory optimization_fn = filename / "optimization.json" runhistory_fn = filename / "runhistory.json" intensifier_fn = filename / "intensifier.json" if filename is not None: with open(optimization_fn) as fp: data = json.load(fp) self._runhistory.load(runhistory_fn, configspace=self._scenario.configspace) self._intensifier.load(intensifier_fn) self._used_target_function_walltime = data["used_target_function_walltime"] self._finished = data["finished"] self._start_time = time.time() - data["used_walltime"]
[docs] def save(self) -> None: """Saves the current stats, runhistory, and intensifier.""" path = self._scenario.output_directory if path is not None: data = { "used_walltime": self.used_walltime, "used_target_function_walltime": self.used_target_function_walltime, "last_update": time.time(), "finished": self._finished, } # Save optimization data with open(str(path / "optimization.json"), "w") as file: json.dump(data, file, indent=2) # And save runhistory and intensifier self._runhistory.save(path / "runhistory.json") self._intensifier.save(path / "intensifier.json")
def _add_results(self) -> None: """Adds results from the runner to the runhistory. Although most of the functionality could be written in the tell method, we separate it here to make it accessible for the automatic optimization procedure only. """ # Check if there is any result for trial_info, trial_value in self._runner.iter_results(): # Add the results of the run to the run history self.tell(trial_info, trial_value) # We expect the first run to always succeed. if self.runhistory.finished == 0 and trial_value.status == StatusType.CRASHED: additional_info = "" if "traceback" in trial_value.additional_info: additional_info = "\n\n" + trial_value.additional_info["traceback"] raise FirstRunCrashedException( "The first run crashed. Please check your setup again." + additional_info ) # Update SMAC stats self._used_target_function_walltime += float(trial_value.time) # Gracefully end optimization if termination cost is reached if self._scenario.termination_cost_threshold != np.inf: cost = self.runhistory.average_cost(trial_info.config) if not isinstance(cost, list): cost = [cost] if not isinstance(self._scenario.termination_cost_threshold, list): cost_threshold = [self._scenario.termination_cost_threshold] else: cost_threshold = self._scenario.termination_cost_threshold if len(cost) != len(cost_threshold): raise RuntimeError("You must specify a termination cost threshold for each objective.") if all(cost[i] < cost_threshold[i] for i in range(len(cost))): logger.info("Cost threshold was reached. Abort is requested.") self._stop = True
[docs] def register_callback(self, callback: Callback, index: int | None = None) -> None: """ Registers a callback to be called before, in between, and after the Bayesian optimization loop. Callback is appended to the list by default. Parameters ---------- callback : Callback The callback to be registered. index : int, optional The index at which the callback should be registered. The default is None. If it is None, append the callback to the list. """ if index is None: index = len(self._callbacks) self._callbacks.insert(index, callback)
def _initialize_state(self) -> None: """Detects whether the optimization is restored from a previous state.""" # Here we actually check whether the run should be continued or not. # More precisely, we update our smbo/runhistory/intensifier object if all component arguments # and scenario object are the same. For doing so, we create a specific hash. # The SMBO object recognizes that stats (based on runhistory) is not empty and hence does not the run initial # design anymore. # Since the runhistory is already updated, the model uses previous data directly. if not self._overwrite: old_output_directory = self._scenario.output_directory if self.exists(old_output_directory): old_scenario = Scenario.load(old_output_directory) if self._scenario == old_scenario: logger.info("Continuing from previous run.") # First we reset everything and then we load the old states self.reset() self.load() # If the last run was not successful, we reset everything again if self._runhistory.submitted <= 1 and self._runhistory.finished == 0: logger.info("Since the previous run was not successful, SMAC will start from scratch again.") self.reset() else: # Here, we run into different scenarios diff = recursively_compare_dicts( Scenario.make_serializable(self._scenario), Scenario.make_serializable(old_scenario), level="scenario", ) logger.info( f"Found old run in `{self._scenario.output_directory}` but it is not the same as the current " f"one:\n{diff}" ) feedback = input( "\nPress one of the following numbers to continue or any other key to abort:\n" "(1) Overwrite old run completely and start a new run.\n" "(2) Rename the old run (append an '-old') and start a new run.\n" ) if feedback == "1": # We don't have to do anything here, since we work with a clean runhistory and stats object pass elif feedback == "2": # Rename old run new_dir = str(old_scenario.output_directory.parent) while True: new_dir += "-old" try: old_scenario.output_directory.parent.rename(new_dir) break except OSError: pass else: raise RuntimeError("SMAC run was stopped by the user.") # And now we save everything self._scenario.save() self.save()
[docs] def validate( self, config: Configuration, *, seed: int | None = None, ) -> float | ndarray[float]: """Validates a configuration on other seeds than the ones used in the optimization process and on the highest budget (if budget type is real-valued). Does not exceed the maximum number of config calls or seeds as defined in the scenario. Parameters ---------- config : Configuration Configuration to validate In case that the budget type is real-valued budget, this argument is ignored. seed : int | None, defaults to None If None, the seed from the scenario is used. Returns ------- cost : float | ndarray[float] The averaged cost of the configuration. In case of multi-fidelity, the cost of each objective is averaged. """ if seed is None: seed = self._scenario.seed costs = [] for trial in self._intensifier.get_trials_of_interest(config, validate=True, seed=seed): kwargs: dict[str, Any] = {} if trial.seed is not None: kwargs["seed"] = trial.seed if trial.budget is not None: kwargs["budget"] = trial.budget if trial.instance is not None: kwargs["instance"] = trial.instance # TODO: Use submit run for faster evaluation # self._runner.submit_trial(trial_info=trial) _, cost, _, _ = self._runner.run(config, **kwargs) costs += [cost] np_costs = np.array(costs) return np.mean(np_costs, axis=0)
[docs] def print_stats(self) -> None: """Prints all statistics.""" logger.info( "\n" f"--- STATISTICS -------------------------------------\n" f"--- Incumbent changed: {self.intensifier.incumbents_changed}\n" f"--- Submitted trials: {self.runhistory.submitted} / {self._scenario.n_trials}\n" f"--- Finished trials: {self.runhistory.finished} / {self._scenario.n_trials}\n" f"--- Configurations: {self.runhistory._n_id}\n" f"--- Used wallclock time: {round(self.used_walltime)} / {self._scenario.walltime_limit} sec\n" "--- Used target function runtime: " f"{round(self.used_target_function_walltime, 2)} / {self._scenario.cputime_limit} sec\n" f"----------------------------------------------------" )