Source code for smac.optimizer.smbo

import os
import logging
import numpy as np
import time
import typing

from smac.callbacks import IncorporateRunResultCallback
from smac.configspace import Configuration
from smac.epm.rf_with_instances import RandomForestWithInstances
from smac.initial_design.initial_design import InitialDesign
from smac.intensification.abstract_racer import AbstractRacer, RunInfoIntent
from smac.optimizer import pSMAC
from smac.optimizer.acquisition import AbstractAcquisitionFunction
from smac.optimizer.random_configuration_chooser import ChooserNoCoolDown, RandomConfigurationChooser
from smac.optimizer.ei_optimization import AcquisitionFunctionMaximizer
from smac.optimizer.epm_configuration_chooser import EPMChooser
from smac.runhistory.runhistory import RunHistory, RunInfo, RunValue
from smac.runhistory.runhistory2epm import AbstractRunHistory2EPM
from smac.scenario.scenario import Scenario
from smac.stats.stats import Stats
from smac.utils.constants import MAXINT
from smac.tae import (
from smac.tae.base import BaseRunner
from import TrajLogger
from smac.utils.validate import Validator

__author__ = "Aaron Klein, Marius Lindauer, Matthias Feurer"
__copyright__ = "Copyright 2015, ML4AAD"
__license__ = "3-clause BSD"

[docs]class SMBO(object): """Interface that contains the main Bayesian optimization loop Attributes ---------- logger incumbent scenario config_space stats initial_design runhistory intensifier num_run rng initial_design_configs epm_chooser tae_runner """ def __init__(self, scenario: Scenario, stats: Stats, initial_design: InitialDesign, runhistory: RunHistory, runhistory2epm: AbstractRunHistory2EPM, intensifier: AbstractRacer, num_run: int, model: RandomForestWithInstances, acq_optimizer: AcquisitionFunctionMaximizer, acquisition_func: AbstractAcquisitionFunction, rng: np.random.RandomState, tae_runner: BaseRunner, restore_incumbent: Configuration = None, random_configuration_chooser: typing.Union[RandomConfigurationChooser] = ChooserNoCoolDown(2.0), predict_x_best: bool = True, min_samples_model: int = 1): """ Interface that contains the main Bayesian optimization loop Parameters ---------- scenario: smac.scenario.scenario.Scenario Scenario object stats: Stats statistics object with configuration budgets initial_design: InitialDesign initial sampling design runhistory: RunHistory runhistory with all runs so far runhistory2epm : AbstractRunHistory2EPM Object that implements the AbstractRunHistory2EPM to convert runhistory data into EPM data intensifier: Intensifier intensification of new challengers against incumbent configuration (probably with some kind of racing on the instances) num_run: int id of this run (used for pSMAC) model: RandomForestWithInstances empirical performance model (right now, we support only RandomForestWithInstances) acq_optimizer: AcquisitionFunctionMaximizer Optimizer of acquisition function. acquisition_func : AcquisitionFunction Object that implements the AbstractAcquisitionFunction (i.e., infill criterion for acq_optimizer) restore_incumbent: Configuration incumbent to be used from the start. ONLY used to restore states. rng: np.random.RandomState Random number generator tae_runner : smac.tae.base.BaseRunner Object target algorithm run executor random_configuration_chooser Chooser for random configuration -- one of * ChooserNoCoolDown(modulus) * ChooserLinearCoolDown(start_modulus, modulus_increment, end_modulus) predict_x_best: bool Choose x_best for computing the acquisition function via the model instead of via the observations. min_samples_model: int - Minimum number of samples to build a model """ self.logger = logging.getLogger( self.__module__ + "." + self.__class__.__name__) self.incumbent = restore_incumbent self.scenario = scenario self.config_space = scenario.cs # type: ignore[attr-defined] # noqa F821 self.stats = stats self.initial_design = initial_design self.runhistory = runhistory self.intensifier = intensifier self.num_run = num_run self.rng = rng self._min_time = 10 ** -5 self.tae_runner = tae_runner self.initial_design_configs = [] # type: typing.List[Configuration] # initialize the chooser to get configurations from the EPM self.epm_chooser = EPMChooser(scenario=scenario, stats=stats, runhistory=runhistory, runhistory2epm=runhistory2epm, model=model, acq_optimizer=acq_optimizer, acquisition_func=acquisition_func, rng=rng, restore_incumbent=restore_incumbent, random_configuration_chooser=random_configuration_chooser, predict_x_best=predict_x_best, min_samples_model=min_samples_model) # Internal variable - if this is set to True it will gracefully stop SMAC self._stop = False # Callbacks. All known callbacks have a key. If something does not have a key here, there is # no callback available. self._callbacks = { '_incorporate_run_results': list() } # type: typing.Dict[str, typing.List[typing.Callable]] self._callback_to_key = { IncorporateRunResultCallback: '_incorporate_run_results', } # type: typing.Dict[typing.Type, str]
[docs] def start(self) -> None: """Starts the Bayesian Optimization loop. Detects whether the optimization is restored from a previous state. """ self.stats.start_timing() # Initialization, depends on input if self.stats.submitted_ta_runs == 0 and self.incumbent is None:'Running initial design') # Intensifier initialization self.initial_design_configs = self.initial_design.select_configurations() # to be on the safe side, never return an empty list of initial configs if not self.initial_design_configs: self.initial_design_configs = [self.config_space.get_default_configuration()] elif self.stats.submitted_ta_runs > 0 and self.incumbent is None: raise ValueError("According to stats there have been runs started, " "but the optimizer cannot detect an incumbent. Did " "you set the incumbent (e.g. after restoring state)?") elif self.stats.submitted_ta_runs == 0 and self.incumbent is not None: raise ValueError("An incumbent is specified, but there are no runs " "recorded as started in the Stats-object. If you're " "restoring a state, please provide the Stats-object.") else: # Restoring state!"State Restored! Starting optimization with " "incumbent %s", self.incumbent)"State restored with following budget:") self.stats.print_stats()
[docs] def run(self) -> Configuration: """Runs the Bayesian optimization loop Returns ---------- incumbent: np.array(1, H) The best found configuration """ self.start() # Main BO loop while True: if self.scenario.shared_model: # type: ignore[attr-defined] # noqa F821, output_dirs=self.scenario.input_psmac_dirs, # type: ignore[attr-defined] # noqa F821 configuration_space=self.config_space, logger=self.logger) start_time = time.time() # sample next configuration for intensification # Initial design runs are also included in the BO loop now. intent, run_info = self.intensifier.get_next_run( challengers=self.initial_design_configs, incumbent=self.incumbent, chooser=self.epm_chooser, run_history=self.runhistory, repeat_configs=self.intensifier.repeat_configs, num_workers=self.tae_runner.num_workers(), ) # remove config from initial design challengers to not repeat it again self.initial_design_configs = [c for c in self.initial_design_configs if c != run_info.config] # update timebound only if a 'new' configuration is sampled as the challenger if self.intensifier.num_run == 0: time_spent = time.time() - start_time time_left = self._get_timebound_for_intensification(time_spent, update=False) self.logger.debug('New intensification time bound: %f', time_left) else: old_time_left = time_left time_spent = time_spent + (time.time() - start_time) time_left = self._get_timebound_for_intensification(time_spent, update=True) self.logger.debug('Updated intensification time bound from %f to %f', old_time_left, time_left) # Skip starting new runs if the budget is now exhausted if self.stats.is_budget_exhausted(): intent = RunInfoIntent.SKIP # Skip the run if there was a request to do so. # For example, during intensifier intensification, we # don't want to rerun a config that was previously ran if intent == RunInfoIntent.RUN: # Track the fact that a run was launched in the run # history. It's status is tagged as RUNNING, and once # completed and processed, it will be updated accordingly self.runhistory.add( config=run_info.config, cost=float(MAXINT), time=0.0, status=StatusType.RUNNING, instance_id=run_info.instance, seed=run_info.seed, budget=run_info.budget, ) run_info.config.config_id = self.runhistory.config_ids[run_info.config] self.tae_runner.submit_run(run_info=run_info) # There are 2 criteria that the stats object uses to know # if the budged was exhausted. # The budget time, which can only be known when the run finishes, # And the number of ta executions. Because we submit the job at this point, # we count this submission as a run. This prevent for using more # runner runs than what the scenario allows self.stats.submitted_ta_runs += 1 elif intent == RunInfoIntent.SKIP: # No launch is required # This marks a transition request from the intensifier # To a new iteration pass elif intent == RunInfoIntent.WAIT: # In any other case, we wait for resources # This likely indicates that no further decision # can be taken by the intensifier until more data is # available self.tae_runner.wait() else: raise NotImplementedError("No other RunInfoIntent has been coded!") # Check if there is any result, or else continue for run_info, result in self.tae_runner.get_finished_runs(): # Add the results of the run to the run history # Additionally check for new incumbent self._incorporate_run_results(run_info, result, time_left) if self.scenario.shared_model: # type: ignore[attr-defined] # noqa F821 assert self.scenario.output_dir_for_this_run is not None # please mypy pSMAC.write(run_history=self.runhistory, output_directory=self.scenario.output_dir_for_this_run, # type: ignore[attr-defined] # noqa F821 logger=self.logger) self.logger.debug("Remaining budget: %f (wallclock), %f (ta costs), %f (target runs)" % ( self.stats.get_remaing_time_budget(), self.stats.get_remaining_ta_budget(), self.stats.get_remaining_ta_runs())) if self.stats.is_budget_exhausted() or self._stop: if self.stats.is_budget_exhausted(): self.logger.debug("Exhausted configuration budget") else: self.logger.debug("Shutting down because a configuration returned status STOP") # The budget can be exhausted for 2 reasons: number of ta runs or # time. If the number of ta runs is reached, but there is still budget, # wait for the runs to finish while self.tae_runner.pending_runs(): self.tae_runner.wait() for run_info, result in self.tae_runner.get_finished_runs(): # Add the results of the run to the run history # Additionally check for new incumbent self._incorporate_run_results(run_info, result, time_left) # Break from the intensification loop, # as there are no more resources break # print stats at the end of each intensification iteration if self.intensifier.iteration_done: self.stats.print_stats(debug_out=True) return self.incumbent
[docs] def validate(self, config_mode: typing.Union[str, typing.List[Configuration]] = 'inc', instance_mode: typing.Union[str, typing.List[str]] = 'train+test', repetitions: int = 1, use_epm: bool = False, n_jobs: int = -1, backend: str = 'threading') -> RunHistory: """Create validator-object and run validation, using scenario-information, runhistory from smbo and tae_runner from intensify Parameters ---------- config_mode: str or list<Configuration> string or directly a list of Configuration str from [def, inc, def+inc, wallclock_time, cpu_time, all] time evaluates at cpu- or wallclock-timesteps of: [max_time/2^0, max_time/2^1, max_time/2^3, ..., default] with max_time being the highest recorded time instance_mode: string what instances to use for validation, from [train, test, train+test] repetitions: int number of repetitions in nondeterministic algorithms (in deterministic will be fixed to 1) use_epm: bool whether to use an EPM instead of evaluating all runs with the TAE n_jobs: int number of parallel processes used by joblib Returns ------- runhistory: RunHistory runhistory containing all specified runs """ if isinstance(config_mode, str): assert self.scenario.output_dir_for_this_run is not None # Please mypy traj_fn = os.path.join(self.scenario.output_dir_for_this_run, "traj_aclib2.json") trajectory = ( TrajLogger.read_traj_aclib_format(fn=traj_fn, cs=self.config_space) ) # type: typing.Optional[typing.List[typing.Dict[str, typing.Union[float, int, Configuration]]]] else: trajectory = None if self.scenario.output_dir_for_this_run: new_rh_path = os.path.join(self.scenario.output_dir_for_this_run, "validated_runhistory.json") # type: typing.Optional[str] # noqa E501 else: new_rh_path = None validator = Validator(self.scenario, trajectory, self.rng) if use_epm: new_rh = validator.validate_epm(config_mode=config_mode, instance_mode=instance_mode, repetitions=repetitions, runhistory=self.runhistory, output_fn=new_rh_path) else: new_rh = validator.validate(config_mode, instance_mode, repetitions, n_jobs, backend, self.runhistory, self.tae_runner, output_fn=new_rh_path) return new_rh
[docs] def _get_timebound_for_intensification(self, time_spent: float, update: bool) -> float: """Calculate time left for intensify from the time spent on choosing challengers using the fraction of time intended for intensification (which is specified in scenario.intensification_percentage). Parameters ---------- time_spent : float update : bool Only used to check in the unit tests how this function was called Returns ------- time_left : float """ frac_intensify = self.scenario.intensification_percentage # type: ignore[attr-defined] # noqa F821 if frac_intensify <= 0 or frac_intensify >= 1: raise ValueError("The value for intensification_percentage-" "option must lie in (0,1), instead: %.2f" % frac_intensify) total_time = time_spent / (1 - frac_intensify) time_left = frac_intensify * total_time self.logger.debug("Total time: %.4f, time spent on choosing next " "configurations: %.4f (%.2f), time left for " "intensification: %.4f (%.2f)" % (total_time, time_spent, (1 - frac_intensify), time_left, frac_intensify)) return time_left
[docs] def _incorporate_run_results(self, run_info: RunInfo, result: RunValue, time_left: float) -> None: """ The SMBO submits a config-run-request via a RunInfo object. When that config run is completed, a RunValue, which contains all the relevant information obtained after running a job, is returned. This method incorporates the status of that run into the stats/runhistory objects so that other consumers can advance with their task. Additionally, it checks for a new incumbent via the intensifier process results, which also has the side effect of moving the intensifier to a new state Parameters ---------- run_info: RunInfo Describes the run (config) from which to process the results result: RunValue Contains relevant information regarding the execution of a config time_left: float time in [sec] available to perform intensify """ # update SMAC stats self.stats.ta_time_used += float(result.time) self.stats.finished_ta_runs += 1 self.logger.debug( "Return: Status: %r, cost: %f, time: %f, additional: %s" % ( result.status, result.cost, result.time, str(result.additional_info) ) ) if result.status == StatusType.ABORT: raise TAEAbortException("Target algorithm status ABORT - SMAC will " "exit. The last incumbent can be found " "in the trajectory-file.") elif result.status == StatusType.STOP: self._stop = True return self.runhistory.add( config=run_info.config, cost=result.cost, time=result.time, status=result.status, instance_id=run_info.instance, seed=run_info.seed, budget=run_info.budget, starttime=result.starttime, endtime=result.endtime, force_update=True, additional_info=result.additional_info, ) self.stats.n_configs = len(self.runhistory.config_ids) if self.scenario.abort_on_first_run_crash : # type: ignore[attr-defined] # noqa F821 if self.stats.finished_ta_runs == 1 and result.status == StatusType.CRASHED: raise FirstRunCrashedException( "First run crashed, abort. Please check your setup -- we assume that your default " "configuration does not crashes. (To deactivate this exception, use the SMAC scenario option " "'abort_on_first_run_crash'). Additional run info: %s" % result.additional_info ) # Update the intensifier with the result of the runs self.incumbent, inc_perf = self.intensifier.process_results( run_info=run_info, incumbent=self.incumbent, run_history=self.runhistory, time_bound=max(self._min_time, time_left), result=result, ) for callback in self._callbacks['_incorporate_run_results']: callback(smbo=self, run_info=run_info, result=result, time_left=time_left) return