Source code for smac.intensification.abstract_racer

from typing import Iterator, List, Mapping, Optional, Tuple

import logging
import time
from collections import OrderedDict
from enum import Enum

import numpy as np

from smac.configspace import Configuration
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.runhistory.runhistory import RunHistory, RunInfo, RunValue
from smac.stats.stats import Stats
from smac.utils.io.traj_logging import TrajLogger
from smac.utils.logging import format_array

_config_to_run_type = Iterator[Optional[Configuration]]

__author__ = "Ashwin Raaghav Narayanan"
__copyright__ = "Copyright 2019, ML4AAD"
__license__ = "3-clause BSD"


[docs]class RunInfoIntent(Enum): """Class to define different requests on how to process the runinfo. Gives the flexibility to indicate whether a configuration should be skipped (SKIP) or if the SMBO should simple run a generated run_info. """ RUN = 0 # Normal run execution of a run info SKIP = 1 # Skip running the run_info WAIT = 2 # Wait for more configs to be processed
[docs]class AbstractRacer(object): """Base class for all racing methods. The "intensification" is designed to output a RunInfo object with enough information to run a given configuration (for example, the run info contains the instance/seed pair, as well as the associated resources). A worker can execute this RunInfo object and produce a RunValue object with the execution results. Each intensifier process the RunValue object and updates it's internal state in preparation for the next iteration. **Note: Do not use directly** Parameters ---------- stats: Stats stats object traj_logger: smac.utils.io.traj_logging.TrajLogger TrajLogger object to log all new incumbents rng : np.random.RandomState instances : List[str] list of all instance ids instance_specifics : Mapping[str, str] mapping from instance name to instance specific string cutoff : float runtime cutoff of TA runs deterministic: bool whether the TA is deterministic or not run_obj_time: bool whether the run objective is runtime or not (if true, apply adaptive capping) minR : int Minimum number of run per config (summed over all calls to intensify). maxR : int Maximum number of runs per config (summed over all calls to intensifiy). adaptive_capping_slackfactor: float slack factor of adpative capping (factor * adpative cutoff) min_chall: int minimal number of challengers to be considered (even if time_bound is exhausted earlier) """ def __init__( self, stats: Stats, traj_logger: TrajLogger, rng: np.random.RandomState, instances: List[str], instance_specifics: Optional[Mapping[str, str]] = None, cutoff: Optional[float] = None, deterministic: bool = False, run_obj_time: bool = True, minR: int = 1, maxR: int = 2000, adaptive_capping_slackfactor: float = 1.2, min_chall: int = 1, ): self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.stats = stats self.traj_logger = traj_logger self.rs = rng # scenario info self.cutoff = cutoff self.deterministic = deterministic self.run_obj_time = run_obj_time self.minR = minR self.maxR = maxR self.adaptive_capping_slackfactor = adaptive_capping_slackfactor self.min_chall = min_chall # instances if instances is None: instances = [] # removing duplicates in the user provided instances self.instances = list(OrderedDict.fromkeys(instances)) if instance_specifics is None: self.instance_specifics = {} # type: Mapping[str, str] else: self.instance_specifics = instance_specifics # general attributes self.num_run = 0 # Number of runs done in an iteration so far self._chall_indx = 0 self._ta_time = 0.0 # attributes for sampling next configuration # Repeating configurations is discouraged for parallel runs self.repeat_configs = False # to mark the end of an iteration self.iteration_done = False
[docs] def get_next_run( self, challengers: Optional[List[Configuration]], incumbent: Configuration, chooser: Optional[EPMChooser], run_history: RunHistory, repeat_configs: bool = True, num_workers: int = 1, ) -> Tuple[RunInfoIntent, RunInfo]: """Abstract method for choosing the next challenger, to allow for different selections across intensifiers uses ``_next_challenger()`` by default. If no more challengers are available, the method should issue a SKIP via RunInfoIntent.SKIP, so that a new iteration can sample new configurations. Parameters ---------- challengers : List[Configuration] promising configurations incumbent: Configuration incumbent configuration chooser : smac.optimizer.epm_configuration_chooser.EPMChooser optimizer that generates next configurations to use for racing run_history : smac.runhistory.runhistory.RunHistory stores all runs we ran so far repeat_configs : bool if False, an evaluated configuration will not be generated again num_workers: int the maximum number of workers available at a given time. Returns ------- run_info: RunInfo An object that encapsulates necessary information for a config run intent: RunInfoIntent Indicator of how to consume the RunInfo object """ raise NotImplementedError()
[docs] def process_results( self, run_info: RunInfo, incumbent: Optional[Configuration], run_history: RunHistory, time_bound: float, result: RunValue, log_traj: bool = True, ) -> Tuple[Configuration, float]: """The intensifier stage will be updated based on the results/status of a configuration execution. Also, a incumbent will be determined. Parameters ---------- run_info : RunInfo A RunInfo containing the configuration that was evaluated incumbent : Optional[Configuration] Best configuration seen so far run_history : RunHistory stores all runs we ran so far if False, an evaluated configuration will not be generated again time_bound : float time in [sec] available to perform intensify result: RunValue Contain the result (status and other methadata) of exercising a challenger/incumbent. log_traj: bool Whether to log changes of incumbents in trajectory Returns ------- incumbent: Configuration() current (maybe new) incumbent configuration inc_perf: float empirical performance of incumbent configuration """ raise NotImplementedError()
def _next_challenger( self, challengers: Optional[List[Configuration]], chooser: Optional[EPMChooser], run_history: RunHistory, repeat_configs: bool = True, ) -> Optional[Configuration]: """Retuns the next challenger to use in intensification If challenger is None, then optimizer will be used to generate the next challenger. Parameters ---------- challengers : List[Configuration] promising configurations to evaluate next chooser : smac.optimizer.epm_configuration_chooser.EPMChooser a sampler that generates next configurations to use for racing run_history : smac.runhistory.runhistory.RunHistory stores all runs we ran so far repeat_configs : bool if False, an evaluated configuration will not be generated again Returns ------- Configuration next challenger to use """ start_time = time.time() used_configs = set(run_history.get_all_configs()) if challengers: # iterate over challengers provided self.logger.debug("Using challengers provided") chall_gen = (c for c in challengers) # type: _config_to_run_type elif chooser: # generating challengers on-the-fly if optimizer is given self.logger.debug("Generating new challenger from optimizer") chall_gen = chooser.choose_next() else: raise ValueError("No configurations/chooser provided. Cannot generate challenger!") self.logger.debug("Time to select next challenger: %.4f" % (time.time() - start_time)) # select challenger from the generators assert chall_gen is not None for challenger in chall_gen: # repetitions allowed if repeat_configs: return challenger # otherwise, select only a unique challenger if challenger not in used_configs: return challenger self.logger.debug("No valid challenger was generated!") return None def _adapt_cutoff(self, challenger: Configuration, run_history: RunHistory, inc_sum_cost: float) -> float: """Adaptive capping: Compute cutoff based on time so far used for incumbent and reduce cutoff for next run of challenger accordingly. !Only applicable if self.run_obj_time !runs on incumbent should be superset of the runs performed for the challenger Parameters ---------- challenger : Configuration Configuration which challenges incumbent run_history : smac.runhistory.runhistory.RunHistory Stores all runs we ran so far inc_sum_cost: float Sum of runtimes of all incumbent runs Returns ------- cutoff: float Adapted cutoff """ if not self.run_obj_time: raise ValueError("This method only works when the run objective is time") curr_cutoff = self.cutoff if self.cutoff is not None else np.inf # cost used by challenger for going over all its runs # should be subset of runs of incumbent (not checked for efficiency # reasons) chall_inst_seeds = run_history.get_runs_for_config(challenger, only_max_observed_budget=True) chal_sum_cost = run_history.sum_cost( config=challenger, instance_seed_budget_keys=chall_inst_seeds, normalize=True ) assert type(chal_sum_cost) == float cutoff = min(curr_cutoff, inc_sum_cost * self.adaptive_capping_slackfactor - chal_sum_cost) return cutoff def _compare_configs( self, incumbent: Configuration, challenger: Configuration, run_history: RunHistory, log_traj: bool = True, ) -> Optional[Configuration]: """Compare two configuration wrt the runhistory and return the one which performs better (or None if the decision is not safe) Decision strategy to return x as being better than y: 1. x has at least as many runs as y 2. x performs better than y on the intersection of runs on x and y Implicit assumption: Challenger was evaluated on the same instance-seed pairs as incumbent Parameters ---------- incumbent: Configuration Current incumbent challenger: Configuration Challenger configuration run_history: smac.runhistory.runhistory.RunHistory Stores all runs we ran so far log_traj: bool Whether to log changes of incumbents in trajectory Returns ------- None or better of the two configurations x,y """ inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True) chall_runs = run_history.get_runs_for_config(challenger, only_max_observed_budget=True) to_compare_runs = set(inc_runs).intersection(chall_runs) # performance on challenger runs, the challenger only becomes incumbent # if it dominates the incumbent chal_perf = run_history.average_cost(challenger, to_compare_runs, normalize=True) inc_perf = run_history.average_cost(incumbent, to_compare_runs, normalize=True) assert type(chal_perf) == float assert type(inc_perf) == float # Line 15 if np.any(chal_perf > inc_perf) and len(chall_runs) >= self.minR: chal_perf_format = format_array(chal_perf) inc_perf_format = format_array(inc_perf) # Incumbent beats challenger self.logger.debug( f"Incumbent ({inc_perf_format}) is better than challenger " f"({chal_perf_format}) on {len(chall_runs)} runs." ) return incumbent # Line 16 if not set(inc_runs) - set(chall_runs): # no plateau walks if np.any(chal_perf >= inc_perf): chal_perf_format = format_array(chal_perf) inc_perf_format = format_array(inc_perf) self.logger.debug( f"Incumbent ({inc_perf_format}) is at least as good as the " f"challenger ({chal_perf_format}) on {len(chall_runs)} runs." ) if log_traj and self.stats.inc_changed == 0: # adding incumbent entry self.stats.inc_changed += 1 # first incumbent self.traj_logger.add_entry( train_perf=chal_perf, incumbent_id=self.stats.inc_changed, incumbent=incumbent, ) return incumbent # Challenger is better than incumbent # and has at least the same runs as inc # -> change incumbent n_samples = len(chall_runs) chal_perf_format = format_array(chal_perf) inc_perf_format = format_array(inc_perf) self.logger.info( f"Challenger ({chal_perf_format}) is better than incumbent ({inc_perf_format}) " f"on {n_samples} runs." ) self._log_incumbent_changes(incumbent, challenger) if log_traj: self.stats.inc_changed += 1 self.traj_logger.add_entry( train_perf=chal_perf, incumbent_id=self.stats.inc_changed, incumbent=challenger ) return challenger # undecided return None def _log_incumbent_changes( self, incumbent: Configuration, challenger: Configuration, ) -> None: params = sorted([(param, incumbent[param], challenger[param]) for param in challenger.keys()]) self.logger.info("Changes in incumbent:") for param in params: if param[1] != param[2]: self.logger.info(" %s : %r -> %r" % param) else: self.logger.debug(" %s remains unchanged: %r", param[0], param[1])