Source code for smac.optimizer.smbo

from typing import Callable, Dict, List, Optional, Type, Union

import logging
import os
import time

import numpy as np

from smac.callbacks import IncorporateRunResultCallback
from smac.configspace import Configuration
from smac.epm.base_epm import BaseEPM
from smac.initial_design.initial_design import InitialDesign
from smac.intensification.abstract_racer import AbstractRacer, RunInfoIntent
from smac.optimizer import pSMAC
from smac.optimizer.acquisition import AbstractAcquisitionFunction
from smac.optimizer.acquisition.maximizer import AcquisitionFunctionMaximizer
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.optimizer.configuration_chooser.random_chooser import (
    ChooserNoCoolDown,
    RandomChooser,
)
from smac.runhistory.runhistory import RunHistory, RunInfo, RunValue
from smac.runhistory.runhistory2epm import AbstractRunHistory2EPM
from smac.scenario.scenario import Scenario
from smac.stats.stats import Stats
from smac.tae import FirstRunCrashedException, StatusType, TAEAbortException
from smac.tae.base import BaseRunner
from smac.utils.constants import MAXINT
from smac.utils.io.traj_logging import TrajLogger
from smac.utils.validate import Validator

__author__ = "Aaron Klein, Marius Lindauer, Matthias Feurer"
__copyright__ = "Copyright 2015, ML4AAD"
__license__ = "3-clause BSD"


[docs]class SMBO(object): """Interface that contains the main Bayesian optimization loop. Parameters ---------- scenario: smac.scenario.scenario.Scenario Scenario object stats: Stats statistics object with configuration budgets initial_design: InitialDesign initial sampling design runhistory: RunHistory runhistory with all runs so far runhistory2epm : AbstractRunHistory2EPM Object that implements the AbstractRunHistory2EPM to convert runhistory data into EPM data intensifier: Intensifier intensification of new challengers against incumbent configuration (probably with some kind of racing on the instances) num_run: int id of this run (used for pSMAC) model: BaseEPM empirical performance model acq_optimizer: AcquisitionFunctionMaximizer Optimizer of acquisition function. acquisition_func : AcquisitionFunction Object that implements the AbstractAcquisitionFunction (i.e., infill criterion for acq_optimizer) restore_incumbent: Configuration incumbent to be used from the start. ONLY used to restore states. rng: np.random.RandomState Random number generator tae_runner : smac.tae.base.BaseRunner Object target algorithm run executor random_configuration_chooser Chooser for random configuration -- one of * ChooserNoCoolDown(modulus) * ChooserLinearCoolDown(start_modulus, modulus_increment, end_modulus) predict_x_best: bool Choose x_best for computing the acquisition function via the model instead of via the observations. min_samples_model: int Minimum number of samples to build a model. epm_chooser_kwargs: typing.Optional[typing.Dict] Additional arguments passed to epmchooser Attributes ---------- logger incumbent scenario config_space stats initial_design runhistory intensifier num_run rng initial_design_configs epm_chooser tae_runner """ def __init__( self, scenario: Scenario, stats: Stats, initial_design: InitialDesign, runhistory: RunHistory, runhistory2epm: AbstractRunHistory2EPM, intensifier: AbstractRacer, num_run: int, model: BaseEPM, acq_optimizer: AcquisitionFunctionMaximizer, acquisition_func: AbstractAcquisitionFunction, rng: np.random.RandomState, tae_runner: BaseRunner, restore_incumbent: Configuration = None, random_configuration_chooser: RandomChooser = ChooserNoCoolDown(modulus=2.0), predict_x_best: bool = True, min_samples_model: int = 1, epm_chooser: Type[EPMChooser] = EPMChooser, epm_chooser_kwargs: Optional[Dict] = None, ): self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.incumbent = restore_incumbent self.scenario = scenario self.config_space = scenario.cs # type: ignore[attr-defined] # noqa F821 self.stats = stats self.initial_design = initial_design self.runhistory = runhistory self.intensifier = intensifier self.num_run = num_run self.rng = rng self._min_time = 10**-5 self.tae_runner = tae_runner self.initial_design_configs = [] # type: List[Configuration] if epm_chooser_kwargs is None: epm_chooser_kwargs = {} # TODO: consider if we need an additional EPMChooser for multi-objective optimization self.epm_chooser = epm_chooser( scenario=scenario, stats=stats, runhistory=runhistory, runhistory2epm=runhistory2epm, model=model, # type: ignore acq_optimizer=acq_optimizer, acquisition_func=acquisition_func, rng=rng, restore_incumbent=restore_incumbent, random_configuration_chooser=random_configuration_chooser, predict_x_best=predict_x_best, min_samples_model=min_samples_model, **epm_chooser_kwargs, ) # Internal variable - if this is set to True it will gracefully stop SMAC self._stop = False # Callbacks. All known callbacks have a key. If something does not have a key here, there is # no callback available. self._callbacks = {"_incorporate_run_results": list()} # type: Dict[str, List[Callable]] self._callback_to_key = { IncorporateRunResultCallback: "_incorporate_run_results", } # type: Dict[Type, str]
[docs] def start(self) -> None: """Starts the Bayesian Optimization loop. Detects whether the optimization is restored from a previous state. """ self.stats.start_timing() # Initialization, depends on input if self.stats.submitted_ta_runs == 0 and self.incumbent is None: self.logger.info("Running initial design") # Intensifier initialization self.initial_design_configs = self.initial_design.select_configurations() # to be on the safe side, never return an empty list of initial configs if not self.initial_design_configs: self.initial_design_configs = [self.config_space.get_default_configuration()] elif self.stats.submitted_ta_runs > 0 and self.incumbent is None: raise ValueError( "According to stats there have been runs started, " "but the optimizer cannot detect an incumbent. Did " "you set the incumbent (e.g. after restoring state)?" ) elif self.stats.submitted_ta_runs == 0 and self.incumbent is not None: raise ValueError( "An incumbent is specified, but there are no runs " "recorded as started in the Stats-object. If you're " "restoring a state, please provide the Stats-object." ) else: # Restoring state! self.logger.info( "State Restored! Starting optimization with " "incumbent %s", self.incumbent, ) self.logger.info("State restored with following budget:") self.stats.print_stats()
[docs] def run(self) -> Configuration: """Runs the Bayesian optimization loop. Returns ------- incumbent: np.array(1, H) The best found configuration. """ self.start() num_obj = len(self.scenario.multi_objectives) # type: ignore[attr-defined] # noqa F821 # Main BO loop while True: if self.scenario.shared_model: # type: ignore[attr-defined] # noqa F821 pSMAC.read( run_history=self.runhistory, output_dirs=self.scenario.input_psmac_dirs, # type: ignore[attr-defined] # noqa F821 configuration_space=self.config_space, logger=self.logger, ) start_time = time.time() # sample next configuration for intensification # Initial design runs are also included in the BO loop now. intent, run_info = self.intensifier.get_next_run( challengers=self.initial_design_configs, incumbent=self.incumbent, chooser=self.epm_chooser, run_history=self.runhistory, repeat_configs=self.intensifier.repeat_configs, num_workers=self.tae_runner.num_workers(), ) # remove config from initial design challengers to not repeat it again self.initial_design_configs = [c for c in self.initial_design_configs if c != run_info.config] # update timebound only if a 'new' configuration is sampled as the challenger if self.intensifier.num_run == 0: time_spent = time.time() - start_time time_left = self._get_timebound_for_intensification(time_spent, update=False) self.logger.debug("New intensification time bound: %f", time_left) else: old_time_left = time_left time_spent = time_spent + (time.time() - start_time) time_left = self._get_timebound_for_intensification(time_spent, update=True) self.logger.debug( "Updated intensification time bound from %f to %f", old_time_left, time_left, ) # Skip starting new runs if the budget is now exhausted if self.stats.is_budget_exhausted(): intent = RunInfoIntent.SKIP # Skip the run if there was a request to do so. # For example, during intensifier intensification, we # don't want to rerun a config that was previously ran if intent == RunInfoIntent.RUN: # Track the fact that a run was launched in the run # history. It's status is tagged as RUNNING, and once # completed and processed, it will be updated accordingly self.runhistory.add( config=run_info.config, cost=float(MAXINT) if num_obj == 1 else np.full(num_obj, float(MAXINT)), time=0.0, status=StatusType.RUNNING, instance_id=run_info.instance, seed=run_info.seed, budget=run_info.budget, ) run_info.config.config_id = self.runhistory.config_ids[run_info.config] self.tae_runner.submit_run(run_info=run_info) # There are 2 criteria that the stats object uses to know # if the budged was exhausted. # The budget time, which can only be known when the run finishes, # And the number of ta executions. Because we submit the job at this point, # we count this submission as a run. This prevent for using more # runner runs than what the scenario allows self.stats.submitted_ta_runs += 1 elif intent == RunInfoIntent.SKIP: # No launch is required # This marks a transition request from the intensifier # To a new iteration pass elif intent == RunInfoIntent.WAIT: # In any other case, we wait for resources # This likely indicates that no further decision # can be taken by the intensifier until more data is # available self.tae_runner.wait() else: raise NotImplementedError("No other RunInfoIntent has been coded!") # Check if there is any result, or else continue for run_info, result in self.tae_runner.get_finished_runs(): # Add the results of the run to the run history # Additionally check for new incumbent self._incorporate_run_results(run_info, result, time_left) if self.scenario.shared_model: # type: ignore[attr-defined] # noqa F821 assert self.scenario.output_dir_for_this_run is not None # please mypy pSMAC.write( run_history=self.runhistory, output_directory=self.scenario.output_dir_for_this_run, # type: ignore[attr-defined] # noqa F821 logger=self.logger, ) self.logger.debug( "Remaining budget: %f (wallclock), %f (ta costs), %f (target runs)" % ( self.stats.get_remaing_time_budget(), self.stats.get_remaining_ta_budget(), self.stats.get_remaining_ta_runs(), ) ) if self.stats.is_budget_exhausted() or self._stop: if self.stats.is_budget_exhausted(): self.logger.debug("Exhausted configuration budget") else: self.logger.debug("Shutting down because a configuration or callback returned status STOP") # The budget can be exhausted for 2 reasons: number of ta runs or # time. If the number of ta runs is reached, but there is still budget, # wait for the runs to finish while self.tae_runner.pending_runs(): self.tae_runner.wait() for run_info, result in self.tae_runner.get_finished_runs(): # Add the results of the run to the run history # Additionally check for new incumbent self._incorporate_run_results(run_info, result, time_left) # Break from the intensification loop, # as there are no more resources break # print stats at the end of each intensification iteration if self.intensifier.iteration_done: self.stats.print_stats(debug_out=True) return self.incumbent
[docs] def validate( self, config_mode: Union[str, List[Configuration]] = "inc", instance_mode: Union[str, List[str]] = "train+test", repetitions: int = 1, use_epm: bool = False, n_jobs: int = -1, backend: str = "threading", ) -> RunHistory: """Create validator-object and run validation, using scenario- information, runhistory from smbo and tae_runner from intensify. Parameters ---------- config_mode: str or list<Configuration> string or directly a list of Configuration str from [def, inc, def+inc, wallclock_time, cpu_time, all] time evaluates at cpu- or wallclock-timesteps of: [max_time/2^0, max_time/2^1, max_time/2^3, ..., default] with max_time being the highest recorded time instance_mode: string what instances to use for validation, from [train, test, train+test] repetitions: int number of repetitions in nondeterministic algorithms (in deterministic will be fixed to 1) use_epm: bool whether to use an EPM instead of evaluating all runs with the TAE n_jobs: int number of parallel processes used by joblib Returns ------- runhistory: RunHistory runhistory containing all specified runs """ if isinstance(config_mode, str): assert self.scenario.output_dir_for_this_run is not None # Please mypy traj_fn = os.path.join(self.scenario.output_dir_for_this_run, "traj_aclib2.json") trajectory = TrajLogger.read_traj_aclib_format( fn=traj_fn, cs=self.config_space ) # type: Optional[List[Dict[str, Union[float, int, Configuration]]]] else: trajectory = None if self.scenario.output_dir_for_this_run: new_rh_path = os.path.join( self.scenario.output_dir_for_this_run, "validated_runhistory.json" ) # type: Optional[str] # noqa E501 else: new_rh_path = None validator = Validator(self.scenario, trajectory, self.rng) if use_epm: new_rh = validator.validate_epm( config_mode=config_mode, instance_mode=instance_mode, repetitions=repetitions, runhistory=self.runhistory, output_fn=new_rh_path, ) else: new_rh = validator.validate( config_mode, instance_mode, repetitions, n_jobs, backend, self.runhistory, self.tae_runner, output_fn=new_rh_path, ) return new_rh
def _get_timebound_for_intensification(self, time_spent: float, update: bool) -> float: """Calculate time left for intensify from the time spent on choosing challengers using the fraction of time intended for intensification (which is specified in scenario.intensification_percentage). Parameters ---------- time_spent : float update : bool Only used to check in the unit tests how this function was called Returns ------- time_left : float """ frac_intensify = self.scenario.intensification_percentage # type: ignore[attr-defined] # noqa F821 if frac_intensify <= 0 or frac_intensify >= 1: raise ValueError( "The value for intensification_percentage-" "option must lie in (0,1), instead: %.2f" % frac_intensify ) total_time = time_spent / (1 - frac_intensify) time_left = frac_intensify * total_time self.logger.debug( "Total time: %.4f, time spent on choosing next " "configurations: %.4f (%.2f), time left for " "intensification: %.4f (%.2f)" % (total_time, time_spent, (1 - frac_intensify), time_left, frac_intensify) ) return time_left def _incorporate_run_results(self, run_info: RunInfo, result: RunValue, time_left: float) -> None: """The SMBO submits a config-run-request via a RunInfo object. When that config run is completed, a RunValue, which contains all the relevant information obtained after running a job, is returned. This method incorporates the status of that run into the stats/runhistory objects so that other consumers can advance with their task. Additionally, it checks for a new incumbent via the intensifier process results, which also has the side effect of moving the intensifier to a new state Parameters ---------- run_info: RunInfo Describes the run (config) from which to process the results result: RunValue Contains relevant information regarding the execution of a config time_left: float time in [sec] available to perform intensify """ # update SMAC stats self.stats.ta_time_used += float(result.time) self.stats.finished_ta_runs += 1 self.logger.debug( f"Return: Status: {result.status}, cost: {result.cost}, time: {result.time}, " f"additional: {result.additional_info}" ) self.runhistory.add( config=run_info.config, cost=result.cost, time=result.time, status=result.status, instance_id=run_info.instance, seed=run_info.seed, budget=run_info.budget, starttime=result.starttime, endtime=result.endtime, force_update=True, additional_info=result.additional_info, ) self.stats.n_configs = len(self.runhistory.config_ids) if result.status == StatusType.ABORT: raise TAEAbortException( "Target algorithm status ABORT - SMAC will " "exit. The last incumbent can be found " "in the trajectory-file." ) elif result.status == StatusType.STOP: self._stop = True return if self.scenario.abort_on_first_run_crash: # type: ignore[attr-defined] # noqa F821 if self.stats.finished_ta_runs == 1 and result.status == StatusType.CRASHED: raise FirstRunCrashedException( "First run crashed, abort. Please check your setup -- we assume that your default " "configuration does not crashes. (To deactivate this exception, use the SMAC scenario option " "'abort_on_first_run_crash'). Additional run info: %s" % result.additional_info ) # Update the intensifier with the result of the runs self.incumbent, inc_perf = self.intensifier.process_results( run_info=run_info, incumbent=self.incumbent, run_history=self.runhistory, time_bound=max(self._min_time, time_left), result=result, ) for callback in self._callbacks["_incorporate_run_results"]: response = callback(smbo=self, run_info=run_info, result=result, time_left=time_left) # If a callback returns False, the optimization loop should be interrupted # the other callbacks are still being called if response is False: self.logger.debug("An IncorporateRunResultCallback returned False, requesting abort.") self._stop = True if self.scenario.save_instantly: # type: ignore[attr-defined] # noqa F821 self.save() return
[docs] def save(self) -> None: """Saves the current stats and runhistory.""" self.stats.save() output_dir = self.scenario.output_dir_for_this_run if output_dir is not None: self.runhistory.save_json(fn=os.path.join(output_dir, "runhistory.json"))