Source code for smac.intensification.simple_intensifier

from typing import Any, Dict, List, Mapping, Optional, Tuple

import numpy as np

from smac.configspace import Configuration
from smac.intensification.abstract_racer import AbstractRacer, RunInfoIntent
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.runhistory.runhistory import RunHistory, RunInfo, RunValue
from smac.stats.stats import Stats
from smac.utils.constants import MAXINT
from smac.utils.io.traj_logging import TrajLogger

__copyright__ = "Copyright 2021, AutoML.org Freiburg-Hannover"
__license__ = "3-clause BSD"


[docs]class SimpleIntensifier(AbstractRacer): """Performs the traditional Bayesian Optimization loop, without instance/seed intensification. Parameters ---------- stats: smac.stats.stats.Stats stats object traj_logger: smac.utils.io.traj_logging.TrajLogger TrajLogger object to log all new incumbents rng : np.random.RandomState instances : List[str] list of all instance ids instance_specifics : Mapping[str, str] mapping from instance name to instance specific string cutoff : Optional[int] cutoff of TA runs deterministic : bool whether the TA is deterministic or not run_obj_time : bool whether the run objective is runtime or not (if true, apply adaptive capping) """ def __init__( self, stats: Stats, traj_logger: TrajLogger, rng: np.random.RandomState, instances: List[str], instance_specifics: Mapping[str, str] = None, cutoff: Optional[float] = None, deterministic: bool = False, run_obj_time: bool = True, **kwargs: Any, ) -> None: super().__init__( stats=stats, traj_logger=traj_logger, rng=rng, instances=instances, instance_specifics=instance_specifics, cutoff=cutoff, deterministic=deterministic, run_obj_time=run_obj_time, adaptive_capping_slackfactor=1.0, min_chall=1, ) # We want to control the number of runs that are sent to # the workers. At any time, we want to make sure that if there # are just W workers, there should be at max W active runs # Below variable tracks active runs not processed self.run_tracker = {} # type: Dict[Tuple[Configuration, str, int, float], bool]
[docs] def process_results( self, run_info: RunInfo, incumbent: Optional[Configuration], run_history: RunHistory, time_bound: float, result: RunValue, log_traj: bool = True, ) -> Tuple[Configuration, float]: """The intensifier stage will be updated based on the results/status of a configuration execution. Also, a incumbent will be determined. Parameters ---------- run_info : RunInfo A RunInfo containing the configuration that was evaluated incumbent : Optional[Configuration] Best configuration seen so far run_history : RunHistory stores all runs we ran so far if False, an evaluated configuration will not be generated again time_bound : float time in [sec] available to perform intensify result: RunValue Contain the result (status and other methadata) of exercising a challenger/incumbent. log_traj: bool Whether to log changes of incumbents in trajectory Returns ------- incumbent: Configuration() current (maybe new) incumbent configuration inc_perf: float empirical performance of incumbent configuration """ # Mark the fact that we processed this configuration self.run_tracker[(run_info.config, run_info.instance, run_info.seed, run_info.budget)] = True # If The incumbent is None we use the challenger if not incumbent: self.logger.info("First run, no incumbent provided; challenger is assumed to be the incumbent") incumbent = run_info.config self.num_run += 1 incumbent = self._compare_configs( challenger=run_info.config, incumbent=incumbent, run_history=run_history, log_traj=log_traj, ) # get incumbent cost inc_perf = run_history.get_cost(incumbent) return incumbent, inc_perf
[docs] def get_next_run( self, challengers: Optional[List[Configuration]], incumbent: Configuration, chooser: Optional[EPMChooser], run_history: RunHistory, repeat_configs: bool = True, num_workers: int = 1, ) -> Tuple[RunInfoIntent, RunInfo]: """Selects which challenger to be used. As in a traditional BO loop, we sample from the EPM, which is the next configuration based on the acquisition function. The input data is read from the runhistory. Parameters ---------- challengers : List[Configuration] promising configurations incumbent: Configuration incumbent configuration chooser : smac.optimizer.epm_configuration_chooser.EPMChooser optimizer that generates next configurations to use for racing run_history : smac.runhistory.runhistory.RunHistory stores all runs we ran so far repeat_configs : bool if False, an evaluated configuration will not be generated again num_workers: int the maximum number of workers available at a given time. Returns ------- intent: RunInfoIntent Indicator of how to consume the RunInfo object run_info: RunInfo An object that encapsulates the minimum information to evaluate a configuration """ # We always sample from the configs provided or the EPM challenger = self._next_challenger( challengers=challengers, chooser=chooser, run_history=run_history, repeat_configs=repeat_configs, ) # Run tracker is a dictionary whose values indicate if a run has been # processed. If a value in this dict is false, it means that a worker # should still be processing this configuration. total_active_runs = len([v for v in self.run_tracker.values() if not v]) if total_active_runs >= num_workers: # We only submit jobs if there is an idle worker return RunInfoIntent.WAIT, RunInfo( config=None, instance=None, instance_specific="0", seed=0, cutoff=self.cutoff, capped=False, budget=0.0, ) run_info = RunInfo( config=challenger, instance=self.instances[-1], instance_specific="0", seed=0 if self.deterministic else int(self.rs.randint(low=0, high=MAXINT, size=1)[0]), cutoff=self.cutoff, capped=False, budget=0.0, ) self.run_tracker[(run_info.config, run_info.instance, run_info.seed, run_info.budget)] = False return RunInfoIntent.RUN, run_info