Source code for smac.intensification.intensification

from typing import List, Mapping, Optional, Tuple, cast

import logging
from collections import Counter
from enum import Enum

import numpy as np

from smac.configspace import Configuration
from smac.intensification.abstract_racer import (
    AbstractRacer,
    RunInfoIntent,
    _config_to_run_type,
)
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.runhistory.runhistory import (
    InstSeedBudgetKey,
    RunHistory,
    RunInfo,
    RunValue,
    StatusType,
)
from smac.stats.stats import Stats
from smac.utils.constants import MAXINT
from smac.utils.io.traj_logging import TrajLogger
from smac.utils.logging import format_array

__author__ = "Katharina Eggensperger, Marius Lindauer"
__copyright__ = "Copyright 2018, ML4AAD"
__license__ = "3-clause BSD"


[docs]class NoMoreChallengers(Exception): """Indicates that no more challengers are available for the intensification to proceed.""" pass
[docs]class IntensifierStage(Enum): """Class to define different stages of intensifier.""" RUN_FIRST_CONFIG = 0 # to replicate the old initial design RUN_INCUMBENT = 1 # Lines 3-7 RUN_CHALLENGER = 2 # Lines 8-17 RUN_BASIS = 3 # helpers to determine what type of run to process # A challenger is assumed to be processed if the stage # is not from first_config or incumbent PROCESS_FIRST_CONFIG_RUN = 4 PROCESS_INCUMBENT_RUN = 5
[docs]class Intensifier(AbstractRacer): r"""Races challengers against an incumbent. SMAC's intensification procedure, in detail: Procedure 2: Intensify(Θ_new, θ_inc, M, R, t_intensify, Π, cˆ) cˆ(θ, Π') denotes the empirical cost of θ on the subset of instances Π' ⊆ Π, based on the runs in R; maxR is a parameter where: Θ_new: Sequence of parameter settings to evaluate, challengers in this class. θ_inc: incumbent parameter setting, incumbent in this class. 1 for i := 1, . . . , length(Θ_new) do 2 θ_new ← Θ_new[i] STAGE-->RUN_INCUMBENT 3 if R contains less than maxR runs with configuration θ_inc then 4 Π' ← {π'∈ Π | R contains less than or equal number of runs using θ_inc and π' 0 than using θ_inc and any other π''∈ Π} 5 π ← instance sampled uniformly at random from Π' 6 s ← seed, drawn uniformly at random 7 R ← ExecuteRun(R, θ_inc, π, s) 8 N ← 1 STAGE-->RUN_CHALLENGER 9 while true do 10 S_missing ← {instance, seed} pairs for which θ_inc was run before, but not θ_new 11 S_torun ← random subset of S_missing of size min(N, size(S_missing)) 12 foreach (π, s) ∈ S_torun do R ← ExecuteRun(R, θ_new, π, s) 13 S_missing ← S_missing \\ S_torun 14 Π_common ← instances for which we previously ran both θ_inc and θ_new 15 if cˆ(θ_new, Π_common) > cˆ(θ_inc, Π_common) then break 16 else if S_missing = ∅ then θ_inc ← θ_new; break 17 else N ← 2 · N 18 if time spent in this call to this procedure exceeds t_intensify and i ≥ 2 then break 19 return [R, θ_inc] Parameters ---------- stats: Stats stats object traj_logger: TrajLogger TrajLogger object to log all new incumbents rng : np.random.RandomState instances : List[str] list of all instance ids instance_specifics : Mapping[str, str] mapping from instance name to instance specific string cutoff : int runtime cutoff of TA runs deterministic: bool whether the TA is deterministic or not run_obj_time: bool whether the run objective is runtime or not (if true, apply adaptive capping) always_race_against: Configuration if incumbent changes race this configuration always against new incumbent; can sometimes prevent over-tuning use_ta_time_bound: bool, if true, trust time reported by the target algorithms instead of measuring the wallclock time for limiting the time of intensification run_limit : int Maximum number of target algorithm runs per call to intensify. maxR : int Maximum number of runs per config (summed over all calls to intensifiy). minR : int Minimum number of run per config (summed over all calls to intensify). adaptive_capping_slackfactor: float slack factor of adpative capping (factor * adpative cutoff) min_chall: int minimal number of challengers to be considered (even if time_bound is exhausted earlier) """ def __init__( self, stats: Stats, traj_logger: TrajLogger, rng: np.random.RandomState, instances: List[str], instance_specifics: Mapping[str, str] = None, cutoff: int = None, deterministic: bool = False, run_obj_time: bool = True, always_race_against: Configuration = None, run_limit: int = MAXINT, use_ta_time_bound: bool = False, minR: int = 1, maxR: int = 2000, adaptive_capping_slackfactor: float = 1.2, min_chall: int = 2, ): super().__init__( stats=stats, traj_logger=traj_logger, rng=rng, instances=instances, instance_specifics=instance_specifics, cutoff=cutoff, deterministic=deterministic, run_obj_time=run_obj_time, minR=minR, maxR=maxR, adaptive_capping_slackfactor=adaptive_capping_slackfactor, min_chall=min_chall, ) self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) # general attributes self.run_limit = run_limit self.always_race_against = always_race_against if self.run_limit < 1: raise ValueError("run_limit must be > 1") self.use_ta_time_bound = use_ta_time_bound self.elapsed_time = 0.0 # stage variables # the intensification procedure is divided into 4 'stages': # 0. run 1st configuration (only in the 1st run when incumbent=None) # 1. add incumbent run # 2. race challenger # 3. race against configuration for a new incumbent self.stage = IntensifierStage.RUN_FIRST_CONFIG self.n_iters = 0 # challenger related variables self._chall_indx = 0 self.num_chall_run = 0 self.current_challenger = None self.continue_challenger = False self.configs_to_run = iter([]) # type: _config_to_run_type self.update_configs_to_run = True # racing related variables self.to_run = [] # type: List[InstSeedBudgetKey] self.inc_sum_cost = np.inf self.N = -1
[docs] def get_next_run( self, challengers: Optional[List[Configuration]], incumbent: Configuration, chooser: Optional[EPMChooser], run_history: RunHistory, repeat_configs: bool = True, num_workers: int = 1, ) -> Tuple[RunInfoIntent, RunInfo]: """This procedure is in charge of generating a RunInfo object to comply with lines 7 (in case stage is stage==RUN_INCUMBENT) or line 12 (In case of stage==RUN_CHALLENGER) A RunInfo object encapsulates the necessary information for a worker to execute the job, nevertheless, a challenger is not always available. This could happen because no more configurations are available or the new configuration to try was already executed. To circumvent this, a intent is also returned: - (intent=RUN) Run the RunInfo object (Normal Execution - (intent=SKIP) Skip this iteration. No challenger is available, in particular because challenger is the same as incumbent Parameters ---------- challengers : List[Configuration] promising configurations incumbent: Configuration incumbent configuration chooser : smac.optimizer.epm_configuration_chooser.EPMChooser optimizer that generates next configurations to use for racing run_history : RunHistory stores all runs we ran so far repeat_configs : bool if False, an evaluated configuration will not be generated again num_workers: int the maximum number of workers available at a given time. Returns ------- intent: RunInfoIntent What should the smbo object do with the runinfo. run_info: RunInfo An object that encapsulates necessary information for a config run """ if num_workers > 1: raise ValueError( "Intensifier does not support more than 1 worker, yet " "the argument num_workers to get_next_run is {}".format(num_workers) ) # If this function is called, it means the iteration is # not complete (we can be starting a new iteration, or re-running a # challenger due to line 17). We evaluate if a iteration is complete or not # via _process_results self.iteration_done = False # In case a crash happens, and FirstRunCrashedException prevents a # failure, revert back to running the incumbent # Challenger case is by construction ok, as there is no special # stage for its processing if self.stage == IntensifierStage.PROCESS_FIRST_CONFIG_RUN: self.stage = IntensifierStage.RUN_FIRST_CONFIG elif self.stage == IntensifierStage.PROCESS_INCUMBENT_RUN: self.stage = IntensifierStage.RUN_INCUMBENT # if first ever run, then assume current challenger to be the incumbent # Because this is the first ever run, we need to sample a new challenger # This new challenger is also assumed to be the incumbent if self.stage == IntensifierStage.RUN_FIRST_CONFIG: if incumbent is None: self.logger.info("First run, no incumbent provided;" " challenger is assumed to be the incumbent") challenger, self.new_challenger = self.get_next_challenger( challengers=challengers, chooser=chooser, ) incumbent = challenger else: inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) if len(inc_runs) > 0: self.logger.debug("Skipping RUN_FIRST_CONFIG stage since " "incumbent has already been ran") self.stage = IntensifierStage.RUN_INCUMBENT # LINES 3-7 if self.stage in [IntensifierStage.RUN_FIRST_CONFIG, IntensifierStage.RUN_INCUMBENT]: # Line 3 # A modified version, that not only checks for maxR # but also makes sure that there are runnable instances, # that is, instances has not been exhausted inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) # Line 4 available_insts = self._get_inc_available_inst(incumbent, run_history) if available_insts and len(inc_runs) < self.maxR: # Lines 5-6-7 instance, seed, cutoff = self._get_next_inc_run(available_insts) instance_specific = "0" if instance is not None: instance_specific = self.instance_specifics.get(instance, "0") return RunInfoIntent.RUN, RunInfo( config=incumbent, instance=instance, instance_specific=instance_specific, seed=seed, cutoff=cutoff, capped=False, budget=0.0, ) else: # This point marks the transitions from lines 3-7 # to 8-18. self.logger.debug("No further instance-seed pairs for incumbent available.") self.stage = IntensifierStage.RUN_CHALLENGER # Understand who is the active challenger. if self.stage == IntensifierStage.RUN_BASIS: # if in RUN_BASIS stage, # return the basis configuration (i.e., `always_race_against`) self.logger.debug("Race against basis configuration after incumbent change.") challenger = self.always_race_against elif self.current_challenger and self.continue_challenger: # if the current challenger could not be rejected, # it is run again on more instances challenger = self.current_challenger else: # Get a new challenger if all instance/pairs have # been completed. Else return the currently running # challenger challenger, self.new_challenger = self.get_next_challenger( challengers=challengers, chooser=chooser, ) # No new challengers are available for this iteration, # Move to the next iteration. This can only happen # when all configurations for this iteration are exhausted # and have been run in all proposed instance/pairs. if challenger is None: return RunInfoIntent.SKIP, RunInfo( config=None, instance=None, instance_specific="0", seed=0, cutoff=self.cutoff, capped=False, budget=0.0, ) # Skip the iteration if the challenger was previously run if challenger == incumbent and self.stage == IntensifierStage.RUN_CHALLENGER: self.challenger_same_as_incumbent = True self.logger.debug("Challenger was the same as the current incumbent; Skipping challenger") return RunInfoIntent.SKIP, RunInfo( config=None, instance=None, instance_specific="0", seed=0, cutoff=self.cutoff, capped=False, budget=0.0, ) self.logger.debug("Intensify on %s", challenger) if hasattr(challenger, "origin"): self.logger.debug("Configuration origin: %s", challenger.origin) if self.stage in [IntensifierStage.RUN_CHALLENGER, IntensifierStage.RUN_BASIS]: if not self.to_run: self.to_run, self.inc_sum_cost = self._get_instances_to_run( incumbent=incumbent, challenger=challenger, run_history=run_history, N=self.N ) is_there_time_due_to_adaptive_cap = self._is_there_time_due_to_adaptive_cap( challenger=challenger, run_history=run_history, ) # If there is no more configs to run in this iteration, or no more # time to do so, change the current stage base on how the current # challenger performs as compared to the incumbent. This is done # via _process_racer_results if len(self.to_run) == 0 or not is_there_time_due_to_adaptive_cap: # If no more time, stage transition is a must if not is_there_time_due_to_adaptive_cap: # Since the challenger fails to outperform the incumbent due to adaptive capping, # we discard all the forthcoming runs. self.to_run = [] self.stage = IntensifierStage.RUN_INCUMBENT self.logger.debug("Stop challenger itensification due " "to adaptive capping.") # Nevertheless, if there are no more instances to run, # we might need to comply with line 17 and keep running the # same challenger. In this case, if there is not enough information # to decide if the challenger is better/worst than the incumbent, # line 17 doubles the number of instances to run. self.logger.debug("No further runs for challenger possible") self._process_racer_results( challenger=challenger, incumbent=incumbent, run_history=run_history, ) # Request SMBO to skip this run. This function will # be called again, after the _process_racer_results # has updated the intensifier stage return RunInfoIntent.SKIP, RunInfo( config=None, instance=None, instance_specific="0", seed=0, cutoff=self.cutoff, capped=False, budget=0.0, ) else: # Lines 8-11 incumbent, instance, seed, cutoff = self._get_next_racer( challenger=challenger, incumbent=incumbent, run_history=run_history, ) capped = False if (self.cutoff is not None) and (cutoff < self.cutoff): # type: ignore[operator] # noqa F821 capped = True instance_specific = "0" if instance is not None: instance_specific = self.instance_specifics.get(instance, "0") # Line 12 return RunInfoIntent.RUN, RunInfo( config=challenger, instance=instance, instance_specific=instance_specific, seed=seed, cutoff=cutoff, capped=capped, budget=0.0, ) else: raise ValueError("No valid stage found!")
[docs] def process_results( self, run_info: RunInfo, incumbent: Optional[Configuration], run_history: RunHistory, time_bound: float, result: RunValue, log_traj: bool = True, ) -> Tuple[Configuration, float]: """The intensifier stage will be updated based on the results/status of a configuration execution. During intensification, the following can happen: * Challenger raced against incumbent * Also, during a challenger run, a capped exception can be triggered, where no racer post processing is needed * A run on the incumbent for more confidence needs to be processed, IntensifierStage.PROCESS_INCUMBENT_RUN * The first run results need to be processed (PROCESS_FIRST_CONFIG_RUN) At the end of any run, checks are done to move to a new iteration. Parameters ---------- run_info : RunInfo A RunInfo containing the configuration that was evaluated incumbent : Optional[Configuration] best configuration so far, None in 1st run run_history : RunHistory stores all runs we ran so far if False, an evaluated configuration will not be generated again time_bound : float time in [sec] available to perform intensify result: RunValue Contain the result (status and other methadata) of exercising a challenger/incumbent. log_traj: bool whether to log changes of incumbents in trajectory Returns ------- incumbent: Configuration() current (maybe new) incumbent configuration inc_perf: float empirical performance of incumbent configuration """ if self.stage == IntensifierStage.PROCESS_FIRST_CONFIG_RUN: if incumbent is None: self.logger.info("First run, no incumbent provided;" " challenger is assumed to be the incumbent") incumbent = run_info.config if self.stage in [ IntensifierStage.PROCESS_INCUMBENT_RUN, IntensifierStage.PROCESS_FIRST_CONFIG_RUN, ]: self._ta_time += result.time self.num_run += 1 self._process_inc_run( incumbent=incumbent, run_history=run_history, log_traj=log_traj, ) else: self.num_run += 1 self.num_chall_run += 1 if result.status == StatusType.CAPPED: # move on to the next iteration self.logger.debug("Challenger itensification timed out due " "to adaptive capping.") self.stage = IntensifierStage.RUN_INCUMBENT else: self._ta_time += result.time incumbent = self._process_racer_results( challenger=run_info.config, incumbent=incumbent, run_history=run_history, log_traj=log_traj, ) self.elapsed_time += result.endtime - result.starttime # check if 1 intensification run is complete - line 18 # this is different to regular SMAC as it requires at least successful challenger run, # which is necessary to work on a fixed grid of configurations. if ( self.stage == IntensifierStage.RUN_INCUMBENT and self._chall_indx >= self.min_chall and self.num_chall_run > 0 ): if self.num_run > self.run_limit: self.logger.debug("Maximum #runs for intensification reached") self._next_iteration() if not self.use_ta_time_bound and self.elapsed_time - time_bound >= 0: self.logger.debug( "Wallclock time limit for intensification reached " "(used: %f sec, available: %f sec)", self.elapsed_time, time_bound, ) self._next_iteration() elif self._ta_time - time_bound >= 0: self.logger.debug( "TA time limit for intensification reached (used: %f sec, available: %f sec)", self._ta_time, time_bound, ) self._next_iteration() inc_perf = run_history.get_cost(incumbent) return incumbent, inc_perf
def _get_next_inc_run( self, available_insts: List[str], ) -> Tuple[str, int, Optional[float]]: """Method to extract the next seed/instance in which a incumbent run most be evaluated. Parameters ---------- available_insts : List[str] A list of instances from which to extract the next incumbent run Returns ------- instance: str Next instance to evaluate seed: float Seed in which to evaluate the instance cutoff: Optional[float] Max time for a given instance/seed pair """ # Line 5 - and avoid https://github.com/numpy/numpy/issues/10791 _idx = self.rs.choice(len(available_insts)) next_instance = available_insts[_idx] # Line 6 if self.deterministic: next_seed = 0 else: next_seed = int(self.rs.randint(low=0, high=MAXINT, size=1)[0]) # Line 7 self.logger.debug("Add run of incumbent for instance={}".format(next_instance)) if self.stage == IntensifierStage.RUN_FIRST_CONFIG: self.stage = IntensifierStage.PROCESS_FIRST_CONFIG_RUN else: self.stage = IntensifierStage.PROCESS_INCUMBENT_RUN return next_instance, next_seed, self.cutoff def _get_inc_available_inst( self, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> List[str]: """Implementation of line 4 of Intensification. This method queries the inc runs in the run history and return the pending instances if any is available Parameters ---------- incumbent: Configuration Either challenger or incumbent run_history : RunHistory stores all runs we ran so far log_traj: bool Whether to log changes of incumbents in trajectory """ # Line 4 # find all instances that have the most runs on the inc inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) inc_inst = [s.instance for s in inc_runs] inc_inst = list(Counter(inc_inst).items()) inc_inst.sort(key=lambda x: x[1], reverse=True) try: max_runs = inc_inst[0][1] except IndexError: self.logger.debug("No run for incumbent found") max_runs = 0 inc_inst = [x[0] for x in inc_inst if x[1] == max_runs] available_insts = list(sorted(set(self.instances) - set(inc_inst))) # if all instances were used n times, we can pick an instances # from the complete set again if not self.deterministic and not available_insts: available_insts = self.instances return available_insts def _process_inc_run( self, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> None: """Method to process the results of a challenger that races an incumbent. Parameters ---------- incumbent: Configuration Either challenger or incumbent run_history : RunHistory stores all runs we ran so far log_traj: bool Whether to log changes of incumbents in trajectory """ # output estimated performance of incumbent inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) inc_perf = run_history.get_cost(incumbent) format_value = format_array(inc_perf) self.logger.info(f"Updated estimated cost of incumbent on {len(inc_runs)} runs: {format_value}") # if running first configuration, go to next stage after 1st run if self.stage in [ IntensifierStage.RUN_FIRST_CONFIG, IntensifierStage.PROCESS_FIRST_CONFIG_RUN, ]: self.stage = IntensifierStage.RUN_INCUMBENT self._next_iteration() else: # Termination condition; after each run, this checks # whether further runs are necessary due to minR if len(inc_runs) >= self.minR or len(inc_runs) >= self.maxR: self.stage = IntensifierStage.RUN_CHALLENGER else: self.stage = IntensifierStage.RUN_INCUMBENT self._compare_configs(incumbent=incumbent, challenger=incumbent, run_history=run_history, log_traj=log_traj) def _get_next_racer( self, challenger: Configuration, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> Tuple[Configuration, str, int, Optional[float]]: """Method to return the next config setting to aggressively race challenger against incumbent. Parameters ---------- challenger : Configuration Configuration which challenges incumbent incumbent : Configuration Best configuration so far run_history : RunHistory Stores all runs we ran so far log_traj: bool Whether to log changes of incumbents in trajectory Returns ------- new_incumbent: Configuration Either challenger or incumbent instance: str Next instance to evaluate seed: int Seed in which to evaluate the instance cutoff: Optional[float] Max time for a given instance/seed pair """ # By the time this function is called, the run history might # have shifted. Re-populate the list if necessary if not self.to_run: # Lines 10/11 self.to_run, self.inc_sum_cost = self._get_instances_to_run( incumbent=incumbent, challenger=challenger, run_history=run_history, N=self.N ) # Run challenger on all <instance, seed> to run instance, seed, _ = self.to_run.pop() cutoff = self.cutoff if self.run_obj_time: cutoff = self._adapt_cutoff(challenger=challenger, run_history=run_history, inc_sum_cost=self.inc_sum_cost) self.logger.debug("Cutoff for challenger: %s" % str(cutoff)) self.logger.debug("Add run of challenger") # Line 12 return incumbent, instance, seed, cutoff def _is_there_time_due_to_adaptive_cap( self, challenger: Configuration, run_history: RunHistory, ) -> bool: """A check to see if there is no more time for a challenger given the fact, that we are optimizing time and the incumbent looks more promising Line 18. Parameters ---------- challenger : Configuration Configuration which challenges incumbent run_history : RunHistory Stores all runs we ran so far Returns ------- bool: whether or not there is more time for a challenger run """ # If time is not objective, then there is always time! if not self.run_obj_time: return True cutoff = self._adapt_cutoff(challenger=challenger, run_history=run_history, inc_sum_cost=self.inc_sum_cost) if cutoff is not None and cutoff <= 0: return False else: return True def _process_racer_results( self, challenger: Configuration, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> Optional[Configuration]: """Process the result of a racing configuration against the current incumbent. Might propose a new incumbent. Parameters ---------- challenger : Configuration Configuration which challenges incumbent incumbent : Configuration Best configuration so far run_history : RunHistory Stores all runs we ran so far Returns ------- new_incumbent: Optional[Configuration] Either challenger or incumbent """ chal_runs = run_history.get_runs_for_config(challenger, only_max_observed_budget=True) chal_perf = run_history.get_cost(challenger) # if all <instance, seed> have been run, compare challenger performance if not self.to_run: new_incumbent = self._compare_configs( incumbent=incumbent, challenger=challenger, run_history=run_history, log_traj=log_traj, ) # update intensification stage if new_incumbent == incumbent: # move on to the next iteration self.stage = IntensifierStage.RUN_INCUMBENT self.continue_challenger = False self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, but worse than incumbent", len(chal_runs), chal_perf, ) elif new_incumbent == challenger: # New incumbent found incumbent = challenger self.continue_challenger = False # compare against basis configuration if provided, else go to next iteration if self.always_race_against and self.always_race_against != challenger: self.stage = IntensifierStage.RUN_BASIS else: self.stage = IntensifierStage.RUN_INCUMBENT self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, becomes new incumbent", len(chal_runs), chal_perf, ) else: # Line 17 # challenger is not worse, continue self.N = 2 * self.N self.continue_challenger = True self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, adding %d runs to the queue", len(chal_runs), chal_perf, self.N / 2, ) else: self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, still %d runs to go (continue racing)", len(chal_runs), chal_perf, len(self.to_run), ) return incumbent def _get_instances_to_run( self, challenger: Configuration, incumbent: Configuration, N: int, run_history: RunHistory, ) -> Tuple[List[InstSeedBudgetKey], float]: """Returns the minimum list of <instance, seed> pairs to run the challenger on before comparing it with the incumbent. Parameters ---------- incumbent: Configuration incumbent configuration challenger: Configuration promising configuration that is presently being evaluated run_history: RunHistory Stores all runs we ran so far N: int number of <instance, seed> pairs to select Returns ------- List[InstSeedBudgetKey] list of <instance, seed, budget> tuples to run float total (runtime) cost of running the incumbent on the instances (used for adaptive capping while racing) """ # get next instances left for the challenger # Line 8 inc_inst_seeds = set(run_history.get_runs_for_config(incumbent, only_max_observed_budget=True)) chall_inst_seeds = set(run_history.get_runs_for_config(challenger, only_max_observed_budget=True)) # Line 10 missing_runs = sorted(inc_inst_seeds - chall_inst_seeds) # Line 11 self.rs.shuffle(missing_runs) if N < 0: raise ValueError("Argument N must not be smaller than zero, but is %s" % str(N)) to_run = missing_runs[: min(N, len(missing_runs))] missing_runs = missing_runs[min(N, len(missing_runs)) :] # for adaptive capping # because of efficiency computed here inst_seed_pairs = list(inc_inst_seeds - set(missing_runs)) # cost used by incumbent for going over all runs in inst_seed_pairs inc_sum_cost = run_history.sum_cost(config=incumbent, instance_seed_budget_keys=inst_seed_pairs, normalize=True) assert type(inc_sum_cost) == float return to_run, inc_sum_cost
[docs] def get_next_challenger( self, challengers: Optional[List[Configuration]], chooser: Optional[EPMChooser], ) -> Tuple[Optional[Configuration], bool]: """This function returns the next challenger, that should be exercised though lines 8-17. It does so by populating configs_to_run, which is a pool of configuration from which the racer will sample. Each configuration within configs_to_run, will be intensified on different instances/seed registered in self.to_run as stated in line 11. A brand new configuration should only be sampled, after all self.to_run instance seed pairs are exhausted. This method triggers a call to _next_iteration if there are no more configurations to run, for the current intensification loop. This marks the transition to Line 2, where a new configuration to intensify will be drawn from epm/initial challengers. Parameters ---------- challengers : List[Configuration] promising configurations chooser : smac.optimizer.epm_configuration_chooser.EPMChooser optimizer that generates next configurations to use for racing Returns ------- Optional[Configuration] next configuration to evaluate bool flag telling if the configuration is newly sampled or one currently being tracked """ # select new configuration when entering 'race challenger' stage # or for the first run if not self.current_challenger or (self.stage == IntensifierStage.RUN_CHALLENGER and not self.to_run): # this is a new intensification run, get the next list of configurations to run if self.update_configs_to_run: configs_to_run = self._generate_challengers(challengers=challengers, chooser=chooser) self.configs_to_run = cast(_config_to_run_type, configs_to_run) self.update_configs_to_run = False # pick next configuration from the generator try: challenger = next(self.configs_to_run) except StopIteration: # out of challengers for the current iteration, start next incumbent iteration self._next_iteration() return None, False if challenger: # reset instance index for the new challenger self._chall_indx += 1 self.current_challenger = challenger self.N = max(1, self.minR) self.to_run = [] return challenger, True # return currently running challenger return self.current_challenger, False
def _generate_challengers( self, challengers: Optional[List[Configuration]], chooser: Optional[EPMChooser], ) -> _config_to_run_type: """Retuns a sequence of challengers to use in intensification If challengers are not provided, then optimizer will be used to generate the challenger list. Parameters ---------- challengers : List[Configuration] promising configurations to evaluate next chooser : smac.optimizer.epm_configuration_chooser.EPMChooser a sampler that generates next configurations to use for racing Returns ------- Optional[Generator[Configuration]] A generator containing the next challengers to use """ if challengers: # iterate over challengers provided self.logger.debug("Using challengers provided") chall_gen = iter(challengers) # type: _config_to_run_type elif chooser: # generating challengers on-the-fly if optimizer is given self.logger.debug("Generating new challenger from optimizer") chall_gen = chooser.choose_next() else: raise ValueError("No configurations/chooser provided. Cannot generate challenger!") return chall_gen def _next_iteration(self) -> None: """Updates tracking variables at the end of an intensification run.""" # track iterations self.n_iters += 1 self.iteration_done = True self.configs_to_run = iter([]) self.update_configs_to_run = True # reset for a new iteration self.num_run = 0 self.num_chall_run = 0 self._chall_indx = 0 self.elapsed_time = 0 self._ta_time = 0.0 self.stats.update_average_configs_per_intensify(n_configs=self._chall_indx)