from typing import List, Mapping, Optional, Tuple, cast

import logging
from collections import Counter
from enum import Enum

import numpy as np

from smac.configspace import Configuration
from smac.intensification.abstract_racer import (
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.runhistory.runhistory import (
from smac.stats.stats import Stats
from smac.utils.constants import MAXINT
from import TrajLogger
from smac.utils.logging import format_array

__author__ = "Katharina Eggensperger, Marius Lindauer"
__copyright__ = "Copyright 2018, ML4AAD"
__license__ = "3-clause BSD"

[docs]class NoMoreChallengers(Exception): """Indicates that no more challengers are available for the intensification to proceed.""" pass
[docs]class IntensifierStage(Enum): """Class to define different stages of intensifier.""" RUN_FIRST_CONFIG = 0 # to replicate the old initial design RUN_INCUMBENT = 1 # Lines 3-7 RUN_CHALLENGER = 2 # Lines 8-17 RUN_BASIS = 3 # helpers to determine what type of run to process # A challenger is assumed to be processed if the stage # is not from first_config or incumbent PROCESS_FIRST_CONFIG_RUN = 4 PROCESS_INCUMBENT_RUN = 5
[docs]class Intensifier(AbstractRacer): r"""Races challengers against an incumbent. SMAC's intensification procedure, in detail: Procedure 2: Intensify(Θ_new, θ_inc, M, R, t_intensify, Π, cˆ) cˆ(θ, Π') denotes the empirical cost of θ on the subset of instances Π' ⊆ Π, based on the runs in R; maxR is a parameter where: Θ_new: Sequence of parameter settings to evaluate, challengers in this class. θ_inc: incumbent parameter setting, incumbent in this class. 1 for i := 1, . . . , length(Θ_new) do 2 θ_new ← Θ_new[i] STAGE-->RUN_INCUMBENT 3 if R contains less than maxR runs with configuration θ_inc then 4 Π' ← {π'∈ Π | R contains less than or equal number of runs using θ_inc and π' 0 than using θ_inc and any other π''∈ Π} 5 π ← instance sampled uniformly at random from Π' 6 s ← seed, drawn uniformly at random 7 R ← ExecuteRun(R, θ_inc, π, s) 8 N ← 1 STAGE-->RUN_CHALLENGER 9 while true do 10 S_missing ← {instance, seed} pairs for which θ_inc was run before, but not θ_new 11 S_torun ← random subset of S_missing of size min(N, size(S_missing)) 12 foreach (π, s) ∈ S_torun do R ← ExecuteRun(R, θ_new, π, s) 13 S_missing ← S_missing \\ S_torun 14 Π_common ← instances for which we previously ran both θ_inc and θ_new 15 if cˆ(θ_new, Π_common) > cˆ(θ_inc, Π_common) then break 16 else if S_missing = ∅ then θ_inc ← θ_new; break 17 else N ← 2 · N 18 if time spent in this call to this procedure exceeds t_intensify and i ≥ 2 then break 19 return [R, θ_inc] Parameters ---------- stats: Stats stats object traj_logger: TrajLogger TrajLogger object to log all new incumbents rng : np.random.RandomState instances : List[str] list of all instance ids instance_specifics : Mapping[str, str] mapping from instance name to instance specific string cutoff : int runtime cutoff of TA runs deterministic: bool whether the TA is deterministic or not run_obj_time: bool whether the run objective is runtime or not (if true, apply adaptive capping) always_race_against: Configuration if incumbent changes race this configuration always against new incumbent; can sometimes prevent over-tuning use_ta_time_bound: bool, if true, trust time reported by the target algorithms instead of measuring the wallclock time for limiting the time of intensification run_limit : int Maximum number of target algorithm runs per call to intensify. maxR : int Maximum number of runs per config (summed over all calls to intensifiy). minR : int Minimum number of run per config (summed over all calls to intensify). adaptive_capping_slackfactor: float slack factor of adpative capping (factor * adpative cutoff) min_chall: int minimal number of challengers to be considered (even if time_bound is exhausted earlier) """ def __init__( self, stats: Stats, traj_logger: TrajLogger, rng: np.random.RandomState, instances: List[str], instance_specifics: Mapping[str, str] = None, cutoff: int = None, deterministic: bool = False, run_obj_time: bool = True, always_race_against: Configuration = None, run_limit: int = MAXINT, use_ta_time_bound: bool = False, minR: int = 1, maxR: int = 2000, adaptive_capping_slackfactor: float = 1.2, min_chall: int = 2, ): super().__init__( stats=stats, traj_logger=traj_logger, rng=rng, instances=instances, instance_specifics=instance_specifics, cutoff=cutoff, deterministic=deterministic, run_obj_time=run_obj_time, minR=minR, maxR=maxR, adaptive_capping_slackfactor=adaptive_capping_slackfactor, min_chall=min_chall, ) self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) # general attributes self.run_limit = run_limit self.always_race_against = always_race_against if self.run_limit < 1: raise ValueError("run_limit must be > 1") self.use_ta_time_bound = use_ta_time_bound self.elapsed_time = 0.0 # stage variables # the intensification procedure is divided into 4 'stages': # 0. run 1st configuration (only in the 1st run when incumbent=None) # 1. add incumbent run # 2. race challenger # 3. race against configuration for a new incumbent self.stage = IntensifierStage.RUN_FIRST_CONFIG self.n_iters = 0 # challenger related variables self._chall_indx = 0 self.num_chall_run = 0 self.current_challenger = None self.continue_challenger = False self.configs_to_run = iter([]) # type: _config_to_run_type self.update_configs_to_run = True # racing related variables self.to_run = [] # type: List[InstSeedBudgetKey] self.inc_sum_cost = np.inf self.N = -1
[docs] def get_next_run( self, challengers: Optional[List[Configuration]], incumbent: Configuration, chooser: Optional[EPMChooser], run_history: RunHistory, repeat_configs: bool = True, num_workers: int = 1, ) -> Tuple[RunInfoIntent, RunInfo]: """This procedure is in charge of generating a RunInfo object to comply with lines 7 (in case stage is stage==RUN_INCUMBENT) or line 12 (In case of stage==RUN_CHALLENGER) A RunInfo object encapsulates the necessary information for a worker to execute the job, nevertheless, a challenger is not always available. This could happen because no more configurations are available or the new configuration to try was already executed. To circumvent this, a intent is also returned: - (intent=RUN) Run the RunInfo object (Normal Execution - (intent=SKIP) Skip this iteration. No challenger is available, in particular because challenger is the same as incumbent Parameters ---------- challengers : List[Configuration] promising configurations incumbent: Configuration incumbent configuration chooser : smac.optimizer.epm_configuration_chooser.EPMChooser optimizer that generates next configurations to use for racing run_history : RunHistory stores all runs we ran so far repeat_configs : bool if False, an evaluated configuration will not be generated again num_workers: int the maximum number of workers available at a given time. Returns ------- intent: RunInfoIntent What should the smbo object do with the runinfo. run_info: RunInfo An object that encapsulates necessary information for a config run """ if num_workers > 1: raise ValueError( "Intensifier does not support more than 1 worker, yet " "the argument num_workers to get_next_run is {}".format(num_workers) ) # If this function is called, it means the iteration is # not complete (we can be starting a new iteration, or re-running a # challenger due to line 17). We evaluate if a iteration is complete or not # via _process_results self.iteration_done = False # In case a crash happens, and FirstRunCrashedException prevents a # failure, revert back to running the incumbent # Challenger case is by construction ok, as there is no special # stage for its processing if self.stage == IntensifierStage.PROCESS_FIRST_CONFIG_RUN: self.stage = IntensifierStage.RUN_FIRST_CONFIG elif self.stage == IntensifierStage.PROCESS_INCUMBENT_RUN: self.stage = IntensifierStage.RUN_INCUMBENT # if first ever run, then assume current challenger to be the incumbent # Because this is the first ever run, we need to sample a new challenger # This new challenger is also assumed to be the incumbent if self.stage == IntensifierStage.RUN_FIRST_CONFIG: if incumbent is None:"First run, no incumbent provided;" " challenger is assumed to be the incumbent") challenger, self.new_challenger = self.get_next_challenger( challengers=challengers, chooser=chooser, ) incumbent = challenger else: inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) if len(inc_runs) > 0: self.logger.debug("Skipping RUN_FIRST_CONFIG stage since " "incumbent has already been ran") self.stage = IntensifierStage.RUN_INCUMBENT # LINES 3-7 if self.stage in [IntensifierStage.RUN_FIRST_CONFIG, IntensifierStage.RUN_INCUMBENT]: # Line 3 # A modified version, that not only checks for maxR # but also makes sure that there are runnable instances, # that is, instances has not been exhausted inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) # Line 4 available_insts = self._get_inc_available_inst(incumbent, run_history) if available_insts and len(inc_runs) < self.maxR: # Lines 5-6-7 instance, seed, cutoff = self._get_next_inc_run(available_insts) instance_specific = "0" if instance is not None: instance_specific = self.instance_specifics.get(instance, "0") return RunInfoIntent.RUN, RunInfo( config=incumbent, instance=instance, instance_specific=instance_specific, seed=seed, cutoff=cutoff, capped=False, budget=0.0, ) else: # This point marks the transitions from lines 3-7 # to 8-18. self.logger.debug("No further instance-seed pairs for incumbent available.") self.stage = IntensifierStage.RUN_CHALLENGER # Understand who is the active challenger. if self.stage == IntensifierStage.RUN_BASIS: # if in RUN_BASIS stage, # return the basis configuration (i.e., `always_race_against`) self.logger.debug("Race against basis configuration after incumbent change.") challenger = self.always_race_against elif self.current_challenger and self.continue_challenger: # if the current challenger could not be rejected, # it is run again on more instances challenger = self.current_challenger else: # Get a new challenger if all instance/pairs have # been completed. Else return the currently running # challenger challenger, self.new_challenger = self.get_next_challenger( challengers=challengers, chooser=chooser, ) # No new challengers are available for this iteration, # Move to the next iteration. This can only happen # when all configurations for this iteration are exhausted # and have been run in all proposed instance/pairs. if challenger is None: return RunInfoIntent.SKIP, RunInfo( config=None, instance=None, instance_specific="0", seed=0, cutoff=self.cutoff, capped=False, budget=0.0, ) # Skip the iteration if the challenger was previously run if challenger == incumbent and self.stage == IntensifierStage.RUN_CHALLENGER: self.challenger_same_as_incumbent = True self.logger.debug("Challenger was the same as the current incumbent; Skipping challenger") return RunInfoIntent.SKIP, RunInfo( config=None, instance=None, instance_specific="0", seed=0, cutoff=self.cutoff, capped=False, budget=0.0, ) self.logger.debug("Intensify on %s", challenger) if hasattr(challenger, "origin"): self.logger.debug("Configuration origin: %s", challenger.origin) if self.stage in [IntensifierStage.RUN_CHALLENGER, IntensifierStage.RUN_BASIS]: if not self.to_run: self.to_run, self.inc_sum_cost = self._get_instances_to_run( incumbent=incumbent, challenger=challenger, run_history=run_history, N=self.N ) is_there_time_due_to_adaptive_cap = self._is_there_time_due_to_adaptive_cap( challenger=challenger, run_history=run_history, ) # If there is no more configs to run in this iteration, or no more # time to do so, change the current stage base on how the current # challenger performs as compared to the incumbent. This is done # via _process_racer_results if len(self.to_run) == 0 or not is_there_time_due_to_adaptive_cap: # If no more time, stage transition is a must if not is_there_time_due_to_adaptive_cap: # Since the challenger fails to outperform the incumbent due to adaptive capping, # we discard all the forthcoming runs. self.to_run = [] self.stage = IntensifierStage.RUN_INCUMBENT self.logger.debug("Stop challenger itensification due " "to adaptive capping.") # Nevertheless, if there are no more instances to run, # we might need to comply with line 17 and keep running the # same challenger. In this case, if there is not enough information # to decide if the challenger is better/worst than the incumbent, # line 17 doubles the number of instances to run. self.logger.debug("No further runs for challenger possible") self._process_racer_results( challenger=challenger, incumbent=incumbent, run_history=run_history, ) # Request SMBO to skip this run. This function will # be called again, after the _process_racer_results # has updated the intensifier stage return RunInfoIntent.SKIP, RunInfo( config=None, instance=None, instance_specific="0", seed=0, cutoff=self.cutoff, capped=False, budget=0.0, ) else: # Lines 8-11 incumbent, instance, seed, cutoff = self._get_next_racer( challenger=challenger, incumbent=incumbent, run_history=run_history, ) capped = False if (self.cutoff is not None) and (cutoff < self.cutoff): # type: ignore[operator] # noqa F821 capped = True instance_specific = "0" if instance is not None: instance_specific = self.instance_specifics.get(instance, "0") # Line 12 return RunInfoIntent.RUN, RunInfo( config=challenger, instance=instance, instance_specific=instance_specific, seed=seed, cutoff=cutoff, capped=capped, budget=0.0, ) else: raise ValueError("No valid stage found!")
[docs] def process_results( self, run_info: RunInfo, incumbent: Optional[Configuration], run_history: RunHistory, time_bound: float, result: RunValue, log_traj: bool = True, ) -> Tuple[Configuration, float]: """The intensifier stage will be updated based on the results/status of a configuration execution. During intensification, the following can happen: * Challenger raced against incumbent * Also, during a challenger run, a capped exception can be triggered, where no racer post processing is needed * A run on the incumbent for more confidence needs to be processed, IntensifierStage.PROCESS_INCUMBENT_RUN * The first run results need to be processed (PROCESS_FIRST_CONFIG_RUN) At the end of any run, checks are done to move to a new iteration. Parameters ---------- run_info : RunInfo A RunInfo containing the configuration that was evaluated incumbent : Optional[Configuration] best configuration so far, None in 1st run run_history : RunHistory stores all runs we ran so far if False, an evaluated configuration will not be generated again time_bound : float time in [sec] available to perform intensify result: RunValue Contain the result (status and other methadata) of exercising a challenger/incumbent. log_traj: bool whether to log changes of incumbents in trajectory Returns ------- incumbent: Configuration() current (maybe new) incumbent configuration inc_perf: float empirical performance of incumbent configuration """ if self.stage == IntensifierStage.PROCESS_FIRST_CONFIG_RUN: if incumbent is None:"First run, no incumbent provided;" " challenger is assumed to be the incumbent") incumbent = run_info.config if self.stage in [ IntensifierStage.PROCESS_INCUMBENT_RUN, IntensifierStage.PROCESS_FIRST_CONFIG_RUN, ]: self._ta_time += result.time self.num_run += 1 self._process_inc_run( incumbent=incumbent, run_history=run_history, log_traj=log_traj, ) else: self.num_run += 1 self.num_chall_run += 1 if result.status == StatusType.CAPPED: # move on to the next iteration self.logger.debug("Challenger itensification timed out due " "to adaptive capping.") self.stage = IntensifierStage.RUN_INCUMBENT else: self._ta_time += result.time incumbent = self._process_racer_results( challenger=run_info.config, incumbent=incumbent, run_history=run_history, log_traj=log_traj, ) self.elapsed_time += result.endtime - result.starttime # check if 1 intensification run is complete - line 18 # this is different to regular SMAC as it requires at least successful challenger run, # which is necessary to work on a fixed grid of configurations. if ( self.stage == IntensifierStage.RUN_INCUMBENT and self._chall_indx >= self.min_chall and self.num_chall_run > 0 ): if self.num_run > self.run_limit: self.logger.debug("Maximum #runs for intensification reached") self._next_iteration() if not self.use_ta_time_bound and self.elapsed_time - time_bound >= 0: self.logger.debug( "Wallclock time limit for intensification reached " "(used: %f sec, available: %f sec)", self.elapsed_time, time_bound, ) self._next_iteration() elif self._ta_time - time_bound >= 0: self.logger.debug( "TA time limit for intensification reached (used: %f sec, available: %f sec)", self._ta_time, time_bound, ) self._next_iteration() inc_perf = run_history.get_cost(incumbent) return incumbent, inc_perf
def _get_next_inc_run( self, available_insts: List[str], ) -> Tuple[str, int, Optional[float]]: """Method to extract the next seed/instance in which a incumbent run most be evaluated. Parameters ---------- available_insts : List[str] A list of instances from which to extract the next incumbent run Returns ------- instance: str Next instance to evaluate seed: float Seed in which to evaluate the instance cutoff: Optional[float] Max time for a given instance/seed pair """ # Line 5 - and avoid _idx = next_instance = available_insts[_idx] # Line 6 if self.deterministic: next_seed = 0 else: next_seed = int(, high=MAXINT, size=1)[0]) # Line 7 self.logger.debug("Add run of incumbent for instance={}".format(next_instance)) if self.stage == IntensifierStage.RUN_FIRST_CONFIG: self.stage = IntensifierStage.PROCESS_FIRST_CONFIG_RUN else: self.stage = IntensifierStage.PROCESS_INCUMBENT_RUN return next_instance, next_seed, self.cutoff def _get_inc_available_inst( self, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> List[str]: """Implementation of line 4 of Intensification. This method queries the inc runs in the run history and return the pending instances if any is available Parameters ---------- incumbent: Configuration Either challenger or incumbent run_history : RunHistory stores all runs we ran so far log_traj: bool Whether to log changes of incumbents in trajectory """ # Line 4 # find all instances that have the most runs on the inc inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) inc_inst = [s.instance for s in inc_runs] inc_inst = list(Counter(inc_inst).items()) inc_inst.sort(key=lambda x: x[1], reverse=True) try: max_runs = inc_inst[0][1] except IndexError: self.logger.debug("No run for incumbent found") max_runs = 0 inc_inst = [x[0] for x in inc_inst if x[1] == max_runs] available_insts = list(sorted(set(self.instances) - set(inc_inst))) # if all instances were used n times, we can pick an instances # from the complete set again if not self.deterministic and not available_insts: available_insts = self.instances return available_insts def _process_inc_run( self, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> None: """Method to process the results of a challenger that races an incumbent. Parameters ---------- incumbent: Configuration Either challenger or incumbent run_history : RunHistory stores all runs we ran so far log_traj: bool Whether to log changes of incumbents in trajectory """ # output estimated performance of incumbent inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) inc_perf = run_history.get_cost(incumbent) format_value = format_array(inc_perf)"Updated estimated cost of incumbent on {len(inc_runs)} runs: {format_value}") # if running first configuration, go to next stage after 1st run if self.stage in [ IntensifierStage.RUN_FIRST_CONFIG, IntensifierStage.PROCESS_FIRST_CONFIG_RUN, ]: self.stage = IntensifierStage.RUN_INCUMBENT self._next_iteration() else: # Termination condition; after each run, this checks # whether further runs are necessary due to minR if len(inc_runs) >= self.minR or len(inc_runs) >= self.maxR: self.stage = IntensifierStage.RUN_CHALLENGER else: self.stage = IntensifierStage.RUN_INCUMBENT self._compare_configs(incumbent=incumbent, challenger=incumbent, run_history=run_history, log_traj=log_traj) def _get_next_racer( self, challenger: Configuration, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> Tuple[Configuration, str, int, Optional[float]]: """Method to return the next config setting to aggressively race challenger against incumbent. Parameters ---------- challenger : Configuration Configuration which challenges incumbent incumbent : Configuration Best configuration so far run_history : RunHistory Stores all runs we ran so far log_traj: bool Whether to log changes of incumbents in trajectory Returns ------- new_incumbent: Configuration Either challenger or incumbent instance: str Next instance to evaluate seed: int Seed in which to evaluate the instance cutoff: Optional[float] Max time for a given instance/seed pair """ # By the time this function is called, the run history might # have shifted. Re-populate the list if necessary if not self.to_run: # Lines 10/11 self.to_run, self.inc_sum_cost = self._get_instances_to_run( incumbent=incumbent, challenger=challenger, run_history=run_history, N=self.N ) # Run challenger on all <instance, seed> to run instance, seed, _ = self.to_run.pop() cutoff = self.cutoff if self.run_obj_time: cutoff = self._adapt_cutoff(challenger=challenger, run_history=run_history, inc_sum_cost=self.inc_sum_cost) self.logger.debug("Cutoff for challenger: %s" % str(cutoff)) self.logger.debug("Add run of challenger") # Line 12 return incumbent, instance, seed, cutoff def _is_there_time_due_to_adaptive_cap( self, challenger: Configuration, run_history: RunHistory, ) -> bool: """A check to see if there is no more time for a challenger given the fact, that we are optimizing time and the incumbent looks more promising Line 18. Parameters ---------- challenger : Configuration Configuration which challenges incumbent run_history : RunHistory Stores all runs we ran so far Returns ------- bool: whether or not there is more time for a challenger run """ # If time is not objective, then there is always time! if not self.run_obj_time: return True cutoff = self._adapt_cutoff(challenger=challenger, run_history=run_history, inc_sum_cost=self.inc_sum_cost) if cutoff is not None and cutoff <= 0: return False else: return True def _process_racer_results( self, challenger: Configuration, incumbent: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> Optional[Configuration]: """Process the result of a racing configuration against the current incumbent. Might propose a new incumbent. Parameters ---------- challenger : Configuration Configuration which challenges incumbent incumbent : Configuration Best configuration so far run_history : RunHistory Stores all runs we ran so far Returns ------- new_incumbent: Optional[Configuration] Either challenger or incumbent """ chal_runs = run_history.get_runs_for_config(challenger, only_max_observed_budget=True) chal_perf = run_history.get_cost(challenger) # if all <instance, seed> have been run, compare challenger performance if not self.to_run: new_incumbent = self._compare_configs( incumbent=incumbent, challenger=challenger, run_history=run_history, log_traj=log_traj, ) # update intensification stage if new_incumbent == incumbent: # move on to the next iteration self.stage = IntensifierStage.RUN_INCUMBENT self.continue_challenger = False self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, but worse than incumbent", len(chal_runs), chal_perf, ) elif new_incumbent == challenger: # New incumbent found incumbent = challenger self.continue_challenger = False # compare against basis configuration if provided, else go to next iteration if self.always_race_against and self.always_race_against != challenger: self.stage = IntensifierStage.RUN_BASIS else: self.stage = IntensifierStage.RUN_INCUMBENT self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, becomes new incumbent", len(chal_runs), chal_perf, ) else: # Line 17 # challenger is not worse, continue self.N = 2 * self.N self.continue_challenger = True self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, adding %d runs to the queue", len(chal_runs), chal_perf, self.N / 2, ) else: self.logger.debug( "Estimated cost of challenger on %d runs: %.4f, still %d runs to go (continue racing)", len(chal_runs), chal_perf, len(self.to_run), ) return incumbent def _get_instances_to_run( self, challenger: Configuration, incumbent: Configuration, N: int, run_history: RunHistory, ) -> Tuple[List[InstSeedBudgetKey], float]: """Returns the minimum list of <instance, seed> pairs to run the challenger on before comparing it with the incumbent. Parameters ---------- incumbent: Configuration incumbent configuration challenger: Configuration promising configuration that is presently being evaluated run_history: RunHistory Stores all runs we ran so far N: int number of <instance, seed> pairs to select Returns ------- List[InstSeedBudgetKey] list of <instance, seed, budget> tuples to run float total (runtime) cost of running the incumbent on the instances (used for adaptive capping while racing) """ # get next instances left for the challenger # Line 8 inc_inst_seeds = set(run_history.get_runs_for_config(incumbent, only_max_observed_budget=True)) chall_inst_seeds = set(run_history.get_runs_for_config(challenger, only_max_observed_budget=True)) # Line 10 missing_runs = sorted(inc_inst_seeds - chall_inst_seeds) # Line 11 if N < 0: raise ValueError("Argument N must not be smaller than zero, but is %s" % str(N)) to_run = missing_runs[: min(N, len(missing_runs))] missing_runs = missing_runs[min(N, len(missing_runs)) :] # for adaptive capping # because of efficiency computed here inst_seed_pairs = list(inc_inst_seeds - set(missing_runs)) # cost used by incumbent for going over all runs in inst_seed_pairs inc_sum_cost = run_history.sum_cost(config=incumbent, instance_seed_budget_keys=inst_seed_pairs, normalize=True) assert type(inc_sum_cost) == float return to_run, inc_sum_cost
[docs] def get_next_challenger( self, challengers: Optional[List[Configuration]], chooser: Optional[EPMChooser], ) -> Tuple[Optional[Configuration], bool]: """This function returns the next challenger, that should be exercised though lines 8-17. It does so by populating configs_to_run, which is a pool of configuration from which the racer will sample. Each configuration within configs_to_run, will be intensified on different instances/seed registered in self.to_run as stated in line 11. A brand new configuration should only be sampled, after all self.to_run instance seed pairs are exhausted. This method triggers a call to _next_iteration if there are no more configurations to run, for the current intensification loop. This marks the transition to Line 2, where a new configuration to intensify will be drawn from epm/initial challengers. Parameters ---------- challengers : List[Configuration] promising configurations chooser : smac.optimizer.epm_configuration_chooser.EPMChooser optimizer that generates next configurations to use for racing Returns ------- Optional[Configuration] next configuration to evaluate bool flag telling if the configuration is newly sampled or one currently being tracked """ # select new configuration when entering 'race challenger' stage # or for the first run if not self.current_challenger or (self.stage == IntensifierStage.RUN_CHALLENGER and not self.to_run): # this is a new intensification run, get the next list of configurations to run if self.update_configs_to_run: configs_to_run = self._generate_challengers(challengers=challengers, chooser=chooser) self.configs_to_run = cast(_config_to_run_type, configs_to_run) self.update_configs_to_run = False # pick next configuration from the generator try: challenger = next(self.configs_to_run) except StopIteration: # out of challengers for the current iteration, start next incumbent iteration self._next_iteration() return None, False if challenger: # reset instance index for the new challenger self._chall_indx += 1 self.current_challenger = challenger self.N = max(1, self.minR) self.to_run = [] return challenger, True # return currently running challenger return self.current_challenger, False
def _generate_challengers( self, challengers: Optional[List[Configuration]], chooser: Optional[EPMChooser], ) -> _config_to_run_type: """Retuns a sequence of challengers to use in intensification If challengers are not provided, then optimizer will be used to generate the challenger list. Parameters ---------- challengers : List[Configuration] promising configurations to evaluate next chooser : smac.optimizer.epm_configuration_chooser.EPMChooser a sampler that generates next configurations to use for racing Returns ------- Optional[Generator[Configuration]] A generator containing the next challengers to use """ if challengers: # iterate over challengers provided self.logger.debug("Using challengers provided") chall_gen = iter(challengers) # type: _config_to_run_type elif chooser: # generating challengers on-the-fly if optimizer is given self.logger.debug("Generating new challenger from optimizer") chall_gen = chooser.choose_next() else: raise ValueError("No configurations/chooser provided. Cannot generate challenger!") return chall_gen def _next_iteration(self) -> None: """Updates tracking variables at the end of an intensification run.""" # track iterations self.n_iters += 1 self.iteration_done = True self.configs_to_run = iter([]) self.update_configs_to_run = True # reset for a new iteration self.num_run = 0 self.num_chall_run = 0 self._chall_indx = 0 self.elapsed_time = 0 self._ta_time = 0.0 self.stats.update_average_configs_per_intensify(n_configs=self._chall_indx)