from typing import Dict, List, Mapping, Optional, Set, Tuple, Union
import logging
import warnings
import numpy as np
from smac.configspace import Configuration
from smac.intensification.abstract_racer import AbstractRacer, RunInfoIntent
from smac.intensification.parallel_scheduling import ParallelScheduler
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.runhistory.runhistory import RunHistory, RunInfo, RunValue
from smac.stats.stats import Stats
from smac.tae import StatusType
from smac.utils.constants import MAXINT
from smac.utils.io.traj_logging import TrajLogger
__author__ = "Ashwin Raaghav Narayanan"
__copyright__ = "Copyright 2019, ML4AAD"
__license__ = "3-clause BSD"
class _SuccessiveHalving(AbstractRacer):
"""Races multiple challengers against an incumbent using Successive Halving method.
This class contains the logic to implement:
"BOHB: Robust and Efficient Hyperparameter Optimization at Scale" (Falkner et al. 2018)
Supplementary reference: http://proceedings.mlr.press/v80/falkner18a/falkner18a-supp.pdf
The `SuccessiveHalving` class can create multiple `_SuccessiveHalving` objects, to
allow parallelism in the method (up to the number of workers available). The user interface
is expected to be `SuccessiveHalving`, yet this class (`_SuccessiveHalving`) contains the
actual single worker implementation of the SMAC4MF method.
Successive Halving intensifier (and Hyperband) can operate on two kinds of budgets:
1. **'Instances' as budget**:
When multiple instances are provided or when run objective is "runtime",
this is the criterion used as budget
for successive halving iterations i.e., the budget determines how many
instances the challengers are evaluated
on at a time. Top challengers for the next iteration are selected based
on the combined performance across all instances used.
If ``initial_budget`` and ``max_budget`` are not provided, then they are
set to 1 and total number of available instances respectively by default.
2. **'Real-valued' budget**:
This is used when there is only one instance provided and when run
objective is "quality", i.e. budget is a positive, real-valued number
that can be passed to the target algorithm as an argument.
It can be used to control anything by the target algorithm,
Eg: number of epochs for training a neural network.
``initial_budget`` and ``max_budget`` are required parameters for
this type of budget.
Parameters
----------
stats: smac.stats.stats.Stats
stats object
traj_logger: smac.utils.io.traj_logging.TrajLogger
TrajLogger object to log all new incumbents
rng : np.random.RandomState
instances : List[str]
list of all instance ids
instance_specifics : Mapping[str, str]
mapping from instance name to instance specific string
cutoff : Optional[int]
cutoff of TA runs
deterministic : bool
whether the TA is deterministic or not
initial_budget : Optional[float]
minimum budget allowed for 1 run of successive halving
max_budget : Optional[float]
maximum budget allowed for 1 run of successive halving
eta : float
'halving' factor after each iteration in a successive halving run. Defaults to 3
_all_budgets: Optional[np.ndarray] = None
Used internally when HB uses SH as a subrouting
_n_configs_in_stage: Optional[np.ndarray] = None
Used internally when HB uses SH as a subrouting
num_initial_challengers : Optional[int]
number of challengers to consider for the initial budget. If None, calculated internally
run_obj_time : bool
whether the run objective is runtime or not (if true, apply adaptive capping)
n_seeds : Optional[int]
Number of seeds to use, if TA is not deterministic. Defaults to None, i.e., seed is set as 0
instance_order : Optional[str]
how to order instances. Can be set to: [None, shuffle_once, shuffle]
* None - use as is given by the user
* shuffle_once - shuffle once and use across all SH run (default)
* shuffle - shuffle before every SH run
adaptive_capping_slackfactor : float
slack factor of adpative capping (factor * adaptive cutoff)
inst_seed_pairs : List[Tuple[str, int]], optional
Do not set this argument, it will only be used by hyperband!
min_chall: int
minimal number of challengers to be considered (even if time_bound is exhausted earlier). This class will
raise an exception if a value larger than 1 is passed.
incumbent_selection: str
How to select incumbent in successive halving. Only active for real-valued budgets.
Can be set to: [highest_executed_budget, highest_budget, any_budget]
* highest_executed_budget - incumbent is the best in the highest budget run so far (default)
* highest_budget - incumbent is selected only based on the highest budget
* any_budget - incumbent is the best on any budget i.e., best performance regardless of budget
identifier: int
Adds a numerical identifier on this SH instance. Used for debug and tagging
logger messages properly
"""
def __init__(
self,
stats: Stats,
traj_logger: TrajLogger,
rng: np.random.RandomState,
instances: List[str],
instance_specifics: Mapping[str, str] = None,
cutoff: Optional[float] = None,
deterministic: bool = False,
initial_budget: Optional[float] = None,
max_budget: Optional[float] = None,
eta: float = 3,
_all_budgets: Optional[np.ndarray] = None,
_n_configs_in_stage: Optional[np.ndarray] = None,
num_initial_challengers: Optional[int] = None,
run_obj_time: bool = True,
n_seeds: Optional[int] = None,
instance_order: Optional[str] = "shuffle_once",
adaptive_capping_slackfactor: float = 1.2,
inst_seed_pairs: Optional[List[Tuple[str, int]]] = None,
min_chall: int = 1,
incumbent_selection: str = "highest_executed_budget",
identifier: int = 0,
) -> None:
super().__init__(
stats=stats,
traj_logger=traj_logger,
rng=rng,
instances=instances,
instance_specifics=instance_specifics,
cutoff=cutoff,
deterministic=deterministic,
run_obj_time=run_obj_time,
adaptive_capping_slackfactor=adaptive_capping_slackfactor,
min_chall=min_chall,
)
self.identifier = identifier
self.logger = logging.getLogger(self.__module__ + "." + str(self.identifier) + "." + self.__class__.__name__)
if self.min_chall > 1:
raise ValueError("Successive Halving cannot handle argument `min_chall` > 1.")
self.first_run = True
# INSTANCES
self.n_seeds = n_seeds if n_seeds else 1
self.instance_order = instance_order
# NOTE Remove after solving how to handle multiple seeds and 1 instance
if len(self.instances) == 1 and self.n_seeds > 1:
raise NotImplementedError("This case (multiple seeds and 1 instance) cannot be handled yet!")
# if instances are coming from Hyperband, skip the instance preprocessing section
# it is already taken care by Hyperband
if not inst_seed_pairs:
# set seed(s) for all SH runs
# - currently user gives the number of seeds to consider
if self.deterministic:
seeds = [0]
else:
seeds = [int(s) for s in self.rs.randint(low=0, high=MAXINT, size=self.n_seeds)]
if self.n_seeds == 1:
self.logger.warning(
"The target algorithm is specified to be non deterministic, "
"but number of seeds to evaluate are set to 1. "
"Consider setting `n_seeds` > 1."
)
# storing instances & seeds as tuples
self.inst_seed_pairs = [(i, s) for s in seeds for i in self.instances]
# determine instance-seed pair order
if self.instance_order == "shuffle_once":
# randomize once
self.rs.shuffle(self.inst_seed_pairs) # type: ignore
else:
self.inst_seed_pairs = inst_seed_pairs
# successive halving parameters
self._init_sh_params(
initial_budget=initial_budget,
max_budget=max_budget,
eta=eta,
num_initial_challengers=num_initial_challengers,
_all_budgets=_all_budgets,
_n_configs_in_stage=_n_configs_in_stage,
)
# adaptive capping
if self.instance_as_budget and self.instance_order != "shuffle" and self.run_obj_time:
self.adaptive_capping = True
else:
self.adaptive_capping = False
# challengers can be repeated only if optimizing across multiple seeds or changing instance orders every run
# (this does not include having multiple instances)
if self.n_seeds > 1 or self.instance_order == "shuffle":
self.repeat_configs = True
else:
self.repeat_configs = False
# incumbent selection design
assert incumbent_selection in ["highest_executed_budget", "highest_budget", "any_budget"]
self.incumbent_selection = incumbent_selection
# Define state variables to please mypy
# current instance index tracks two things. Configurations that are to be launched,
# That is config A needs to run in 3 instances/seed pairs, then curr_inst_idx should
# track this. But then, if a new configuration is added in the case of parallelism
# a new separate curr_inst_idx needs to be started.
# The indices normally are of type int, but np.inf is used to indicate to not further
# launch instances for this configuration, hence the type is Union[int, float].
self.curr_inst_idx = {} # type: Dict[Configuration, Union[int, float]]
self.running_challenger = None
self.success_challengers = set() # type: Set[Configuration]
self.do_not_advance_challengers = set() # type: Set[Configuration]
self.fail_challengers = set() # type: Set[Configuration]
self.fail_chal_offset = 0
# Track which configs were launched. This will allow to have an extra check to make sure
# that a successive halver deals only with the configs it launched,
# but also allows querying the status of the configs via the run history.
# In other works, the run history is agnostic of the origin of the configurations,
# that is, which successive halving instance created it. The RunInfo object
# is aware of this information, and for parallel execution, the routing of
# finish results is expected to use this information.
# Nevertheless, the common object among SMBO/intensifier, which is the
# run history, does not have this information and so we track locally. That way,
# when we access the complete list of configs from the run history, we filter
# the ones launched by the current succesive halver using self.run_tracker
self.run_tracker = {} # type: Dict[Tuple[Configuration, str, int, float], bool]
def _init_sh_params(
self,
initial_budget: Optional[float],
max_budget: Optional[float],
eta: float,
num_initial_challengers: Optional[int] = None,
_all_budgets: Optional[np.ndarray] = None,
_n_configs_in_stage: Optional[np.ndarray] = None,
) -> None:
"""Initialize Successive Halving parameters.
Parameters
----------
initial_budget : Optional[float]
minimum budget allowed for 1 run of successive halving
max_budget : Optional[float]
maximum budget allowed for 1 run of successive halving
eta : float
'halving' factor after each iteration in a successive halving run
num_initial_challengers : Optional[int]
number of challengers to consider for the initial budget
_all_budgets: Optional[np.ndarray] = None
Used internally when HB uses SH as a subrouting
_n_configs_in_stage: Optional[np.ndarray] = None
Used internally when HB uses SH as a subrouting
"""
if eta <= 1:
raise ValueError("eta must be greater than 1")
self.eta = eta
# BUDGETS
if max_budget is not None and initial_budget is not None and max_budget < initial_budget:
raise ValueError("Max budget has to be larger than min budget")
# - if only 1 instance was provided & quality objective, then use cutoff as budget
# - else, use instances as budget
if not self.run_obj_time and len(self.inst_seed_pairs) <= 1:
# budget with cutoff
if initial_budget is None or max_budget is None:
raise ValueError(
"Successive Halving with real-valued budget (i.e., only 1 instance) "
"requires parameters initial_budget and max_budget for intensification!"
)
self.initial_budget = initial_budget
self.max_budget = max_budget
self.instance_as_budget = False
else:
# budget with instances
if self.run_obj_time and len(self.inst_seed_pairs) <= 1:
self.logger.warning("Successive Halving has objective 'runtime' but only 1 instance-seed pair.")
self.initial_budget = int(initial_budget) if initial_budget else 1
self.max_budget = int(max_budget) if max_budget else len(self.inst_seed_pairs)
self.instance_as_budget = True
if self.max_budget > len(self.inst_seed_pairs):
raise ValueError("Max budget cannot be greater than the number of instance-seed pairs")
if self.max_budget < len(self.inst_seed_pairs):
self.logger.warning(
"Max budget (%d) does not include all instance-seed pairs (%d)"
% (self.max_budget, len(self.inst_seed_pairs))
)
budget_type = "INSTANCES" if self.instance_as_budget else "REAL-VALUED"
self.logger.info(
"Successive Halving configuration: budget type = %s, "
"Initial budget = %.2f, Max. budget = %.2f, eta = %.2f"
% (budget_type, self.initial_budget, self.max_budget, self.eta)
)
# precomputing stuff for SH
# max. no. of SH iterations possible given the budgets
max_sh_iter = int(np.floor(np.log(self.max_budget / self.initial_budget) / np.log(self.eta)))
# initial number of challengers to sample
if num_initial_challengers is None:
num_initial_challengers = int(self.eta**max_sh_iter)
if _all_budgets is not None and _n_configs_in_stage is not None:
# Assert we use the given numbers to avoid rounding issues, see #701
self.all_budgets = _all_budgets
self.n_configs_in_stage = _n_configs_in_stage
else:
# budgets to consider in each stage
linspace = -np.linspace(max_sh_iter, 0, max_sh_iter + 1)
self.all_budgets = self.max_budget * np.power(self.eta, linspace)
# number of challengers to consider in each stage
n_configs_in_stage = num_initial_challengers * np.power(
self.eta, -np.linspace(0, max_sh_iter, max_sh_iter + 1)
)
self.n_configs_in_stage = np.array(np.round(n_configs_in_stage), dtype=int).tolist()
def process_results(
self,
run_info: RunInfo,
incumbent: Optional[Configuration],
run_history: RunHistory,
time_bound: float,
result: RunValue,
log_traj: bool = True,
) -> Tuple[Configuration, float]:
"""The intensifier stage will be updated based on the results/status of a configuration
execution. Also, a incumbent will be determined.
Parameters
----------
run_info : RunInfo
A RunInfo containing the configuration that was evaluated
incumbent : Optional[Configuration]
Best configuration seen so far
run_history : RunHistory
stores all runs we ran so far
if False, an evaluated configuration will not be generated again
time_bound : float
time in [sec] available to perform intensify
result: RunValue
Contain the result (status and other methadata) of exercising
a challenger/incumbent.
log_traj: bool
Whether to log changes of incumbents in trajectory
Returns
-------
incumbent: Configuration
current (maybe new) incumbent configuration
inc_perf: float
empirical performance of incumbent configuration
"""
# Mark the fact that we processed this configuration
self.run_tracker[(run_info.config, run_info.instance, run_info.seed, run_info.budget)] = True
# If The incumbent is None and it is the first run, we use the challenger
if not incumbent and self.first_run:
self.logger.info("First run, no incumbent provided; challenger is assumed to be the incumbent")
incumbent = run_info.config
self.first_run = False
# In a serial run, if we have to CAP a run, then we stop launching
# more configurations for this run.
# In the context of parallelism, we launch the instances proactively
# The fact that self.curr_inst_idx[run_info.config] is np.inf means
# that no more instances will be launched for the current config, so we
# can add a check to make sure that if we are capping, this makes sense
# for the active challenger
if result.status == StatusType.CAPPED and run_info.config == self.running_challenger:
self.curr_inst_idx[run_info.config] = np.inf
else:
self._ta_time = self._ta_time # type: float # make mypy happy
self.num_run = self.num_run # type: int # make mypy happy
self._ta_time += result.time
self.num_run += 1
# 0: Before moving to a new stage, we have to complete M x N tasks, where M is the
# total number of configurations evaluated in N instance/seed pairs.
# The last active configuration is stored in self.running challengers is the M
# configuration, and to get to this point, we have already submitted tasks
# for (M - 1) configurations on N instances seed-pairs. The status of such
# (M - 1) * N tasks is tracked in self.run_tracker, that has a value of False
# if not processed and true if such task has been processed.
# This stage is complete only if all tasks have been launched and all of the
# already launched tasks are processed.
# 1: We first query if we have launched everything already (All M * N tasks)
all_config_inst_seed_launched = self._all_config_inst_seed_pairs_launched(
run_history=run_history,
activate_configuration_being_intensified=self.running_challenger,
)
# 2: Then we get the already submitted tasks (that is, proposed by get_next_runs),
# that have not yet been processed process_results
all_config_inst_seeds_processed = len([v for v in self.run_tracker.values() if not v]) <= 0
# 3: Then the total number of remaining task before we can conclude this stage
# is calculated by taking into account point 2 and 3 above
is_stage_done = all_config_inst_seed_launched and all_config_inst_seeds_processed
# adding challengers to the list of evaluated challengers
# - Stop: CAPPED/CRASHED/TIMEOUT/MEMOUT/DONOTADVANCE (!= SUCCESS)
# - Advance to next stage: SUCCESS
# curr_challengers is a set, so "at least 1" success can be counted by set addition (no duplicates)
# If a configuration is successful, it is added to curr_challengers.
# if it fails it is added to fail_challengers.
if np.isfinite(self.curr_inst_idx[run_info.config]) and result.status == StatusType.SUCCESS:
self.success_challengers.add(run_info.config) # successful configs
elif np.isfinite(self.curr_inst_idx[run_info.config]) and result.status == StatusType.DONOTADVANCE:
self.do_not_advance_challengers.add(run_info.config)
else:
self.fail_challengers.add(run_info.config) # capped/crashed/do not advance configs
# We need to update the incumbent if this config we are processing
# completes all scheduled instance-seed pairs.
# Here, a config/seed/instance is going to be processed for the first time
# (it has been previously scheduled by get_next_run and marked False, indicating
# that it has not been processed yet. Entering process_results() this config/seed/instance
# is marked as TRUE as an indication that it has finished and should be processed)
# so if all configurations runs are marked as TRUE it means that this new config
# was the missing piece to have everything needed to compare against the incumbent
update_incumbent = all([v for k, v in self.run_tracker.items() if k[0] == run_info.config])
# get incumbent if all instances have been evaluated
if is_stage_done or update_incumbent:
incumbent = self._compare_configs(
challenger=run_info.config,
incumbent=incumbent,
run_history=run_history,
log_traj=log_traj,
)
if is_stage_done:
self.logger.info(
"Successive Halving iteration-step: %d-%d with "
"budget [%.2f / %d] - evaluated %d challenger(s)"
% (
self.sh_iters + 1,
self.stage + 1,
self.all_budgets[self.stage],
self.max_budget,
self.n_configs_in_stage[self.stage],
)
)
self._update_stage(run_history=run_history)
# get incumbent cost
inc_perf = run_history.get_cost(incumbent)
return incumbent, inc_perf
def get_next_run(
self,
challengers: Optional[List[Configuration]],
incumbent: Configuration,
chooser: Optional[EPMChooser],
run_history: RunHistory,
repeat_configs: bool = True,
num_workers: int = 1,
) -> Tuple[RunInfoIntent, RunInfo]:
"""Selects which challenger to use based on the iteration stage and set the iteration
parameters. First iteration will choose configurations from the ``chooser`` or input
challengers, while the later iterations pick top configurations from the previously selected
challengers in that iteration.
Parameters
----------
challengers : List[Configuration]
promising configurations
incumbent: Configuration
incumbent configuration
chooser : smac.optimizer.epm_configuration_chooser.EPMChooser
optimizer that generates next configurations to use for racing
run_history : smac.runhistory.runhistory.RunHistory
stores all runs we ran so far
repeat_configs : bool
if False, an evaluated configuration will not be generated again
num_workers: int
the maximum number of workers available
at a given time.
Returns
-------
intent: RunInfoIntent
Indicator of how to consume the RunInfo object
run_info: RunInfo
An object that encapsulates the minimum information to
evaluate a configuration
"""
if num_workers > 1:
warnings.warn(
"Consider using ParallelSuccesiveHalving instead of "
"SuccesiveHalving. The later will halt on each stage "
"transition until all configs for the current stage are completed."
)
# if this is the first run, then initialize tracking variables
if not hasattr(self, "stage"):
self._update_stage(run_history=run_history)
# In the case of multiprocessing, we have runs in Running stage, which have not
# been processed via process_results(). get_next_run() is called agnostically by
# smbo. To prevent launching more configs, than the ones needed, we query if
# there is room for more configurations, else we wait for process_results()
# to trigger a new stage
if self._all_config_inst_seed_pairs_launched(run_history, self.running_challenger):
return RunInfoIntent.WAIT, RunInfo(
config=None,
instance=None,
instance_specific="0",
seed=0,
cutoff=self.cutoff,
capped=False,
budget=0.0,
source_id=self.identifier,
)
# sampling from next challenger marks the beginning of a new iteration
self.iteration_done = False
curr_budget = self.all_budgets[self.stage]
# if all instances have been executed, then reset and move on to next config
if self.instance_as_budget:
prev_budget = int(self.all_budgets[self.stage - 1]) if self.stage > 0 else 0
n_insts = int(curr_budget) - prev_budget
else:
n_insts = len(self.inst_seed_pairs)
# The instances remaining tell us, per configuration, how many instances we
# have suggested to SMBO
n_insts_remaining = n_insts
if self.running_challenger is not None:
n_insts_remaining = n_insts - self.curr_inst_idx[self.running_challenger]
# if there are instances pending, finish running configuration
if self.running_challenger and n_insts_remaining > 0:
challenger = self.running_challenger
new_challenger = False
else:
# select next configuration
if self.stage == 0:
# first stage, so sample from configurations/chooser provided
challenger = self._next_challenger(
challengers=challengers,
chooser=chooser,
run_history=run_history,
repeat_configs=repeat_configs,
)
if challenger is None:
# If no challenger was sampled from the EPM or
# initial challengers, it might mean that the EPM
# is proposing a configuration that is currently running.
# There is a filtering on the above _next_challenger to return
# None if the proposed config us already in the run history
# To get a new config, we wait for more data
return RunInfoIntent.WAIT, RunInfo(
config=None,
instance=None,
instance_specific="0",
seed=0,
cutoff=self.cutoff,
capped=False,
budget=0.0,
source_id=self.identifier,
)
new_challenger = True
else:
# sample top configs from previously sampled configurations
try:
challenger = self.configs_to_run.pop(0)
new_challenger = False
except IndexError:
# self.configs_to_run is populated via update_stage,
# which is triggered after the completion of a run
# If by there are no more configs to run (which is the case
# if we run into a IndexError),
return RunInfoIntent.SKIP, RunInfo(
config=None,
instance=None,
instance_specific="0",
seed=0,
cutoff=self.cutoff,
capped=False,
budget=0.0,
source_id=self.identifier,
)
if challenger:
# We see a challenger for the first time, so no
# instance has been launched
self.curr_inst_idx[challenger] = 0
self._chall_indx = self._chall_indx # type: int # make mypy happy
self._chall_indx += 1
self.running_challenger = challenger
# calculating the incumbent's performance for adaptive capping
# this check is required because:
# - there is no incumbent performance for the first ever 'intensify' run (from initial design)
# - during the 1st intensify run, the incumbent shouldn't be capped after being compared against itself
if incumbent and incumbent != challenger:
inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True)
inc_sum_cost = run_history.sum_cost(config=incumbent, instance_seed_budget_keys=inc_runs, normalize=True)
else:
inc_sum_cost = np.inf
if self.first_run:
self.logger.info("First run, no incumbent provided; challenger is assumed to be the incumbent")
incumbent = challenger
assert type(inc_sum_cost) == float
# Selecting instance-seed subset for this budget, depending on the kind of budget
if self.instance_as_budget:
prev_budget = int(self.all_budgets[self.stage - 1]) if self.stage > 0 else 0
curr_insts = self.inst_seed_pairs[int(prev_budget) : int(curr_budget)]
else:
curr_insts = self.inst_seed_pairs
self.logger.debug(" Running challenger - %s" % str(challenger))
# run the next instance-seed pair for the given configuration
instance, seed = curr_insts[self.curr_inst_idx[challenger]] # type: ignore[index]
# At this point self.curr_inst_idx[challenger] will still be an integer and might
# be marked LATER with np.inf, so ignore mypy error.
# selecting cutoff if running adaptive capping
cutoff = self.cutoff
if self.run_obj_time:
cutoff = self._adapt_cutoff(challenger=challenger, run_history=run_history, inc_sum_cost=inc_sum_cost)
if cutoff is not None and cutoff <= 0:
# ran out of time to validate challenger
self.logger.debug("Stop challenger intensification due to adaptive capping.")
self.curr_inst_idx[challenger] = np.inf
self.logger.debug("Cutoff for challenger: %s" % str(cutoff))
# For testing purposes, this attribute highlights whether a
# new challenger is proposed or not. Not required from a functional
# perspective
self.new_challenger = new_challenger
capped = False
if (self.cutoff is not None) and (cutoff < self.cutoff): # type: ignore[operator] # noqa F821
capped = True
budget = 0.0 if self.instance_as_budget else curr_budget
self.run_tracker[(challenger, instance, seed, budget)] = False
# self.curr_inst_idx Tell us our current instance to be run. The upcoming return
# will launch a challenger on a given instance/seed/pair. The next time this function
# is called, we will like to run self.curr_inst_idx + 1 for this configuration
self.curr_inst_idx[challenger] += 1
return RunInfoIntent.RUN, RunInfo(
config=challenger,
instance=instance,
instance_specific=self.instance_specifics.get(instance, "0"),
seed=seed,
cutoff=cutoff,
capped=capped,
budget=budget,
source_id=self.identifier,
)
def _update_stage(self, run_history: RunHistory) -> None:
"""Update tracking information for a new stage/iteration and update statistics. This method
is called to initialize stage variables and after all configurations of a successive halving
stage are completed.
Parameters
----------
run_history : smac.runhistory.runhistory.RunHistory
stores all runs we ran so far
"""
if not hasattr(self, "stage"):
# initialize all relevant variables for first run
# (this initialization is not a part of init because hyperband uses the same init method and has a )
# to track iteration and stage
self.sh_iters = 0
self.stage = 0
# to track challengers across stages
self.configs_to_run = [] # type: List[Configuration]
self.curr_inst_idx = {}
self.running_challenger = None
self.success_challengers = set() # successful configs
self.do_not_advance_challengers = set() # configs which are successful, but should not be advanced
self.fail_challengers = set() # capped configs and other failures
self.fail_chal_offset = 0
else:
self.stage += 1
# only uncapped challengers are considered valid for the next iteration
valid_challengers = list(
(self.success_challengers | self.do_not_advance_challengers) - self.fail_challengers
)
if self.stage < len(self.all_budgets) and len(valid_challengers) > 0:
# if this is the next stage in same iteration,
# use top 'k' from the evaluated configurations for next iteration
# determine 'k' for the next iteration - at least 1
next_n_chal = int(max(1, self.n_configs_in_stage[self.stage]))
# selecting the top 'k' challengers for the next iteration
configs_to_run = self._top_k(configs=valid_challengers, run_history=run_history, k=next_n_chal)
self.configs_to_run = [
config for config in configs_to_run if config not in self.do_not_advance_challengers
]
# if some runs were capped, top_k returns less than the required configurations
# to handle that, we keep track of how many configurations are missing
# (since they are technically failed here too)
missing_challengers = int(self.n_configs_in_stage[self.stage]) - len(self.configs_to_run)
if missing_challengers > 0:
self.fail_chal_offset = missing_challengers
else:
self.fail_chal_offset = 0
if next_n_chal == missing_challengers:
next_stage = True
self.logger.info(
"Successive Halving iteration-step: %d-%d with "
"budget [%.2f / %d] - expected %d new challenger(s), but "
"no configurations propagated to the next budget.",
self.sh_iters + 1,
self.stage + 1,
self.all_budgets[self.stage],
self.max_budget,
self.n_configs_in_stage[self.stage],
)
else:
next_stage = False
else:
next_stage = True
if next_stage:
# update stats for the prev iteration
self.stats.update_average_configs_per_intensify(n_configs=self._chall_indx)
# reset stats for the new iteration
self._ta_time = 0
self._chall_indx = 0
self.num_run = 0
self.iteration_done = True
self.sh_iters += 1
self.stage = 0
self.run_tracker = {}
self.configs_to_run = []
self.fail_chal_offset = 0
# randomize instance-seed pairs per successive halving run, if user specifies
if self.instance_order == "shuffle":
self.rs.shuffle(self.inst_seed_pairs) # type: ignore
# to track configurations for the next stage
self.success_challengers = set() # successful configs
self.do_not_advance_challengers = set() # successful, but should not be advanced to the next budget/stage
self.fail_challengers = set() # capped/failed configs
self.curr_inst_idx = {}
self.running_challenger = None
def _compare_configs(
self,
incumbent: Configuration,
challenger: Configuration,
run_history: RunHistory,
log_traj: bool = True,
) -> Optional[Configuration]:
"""Compares the challenger with current incumbent and returns the best configuration, based
on the given incumbent selection design.
Parameters
----------
challenger : Configuration
promising configuration
incumbent : Configuration
best configuration so far
run_history : smac.runhistory.runhistory.RunHistory
stores all runs we ran so far
log_traj : bool
whether to log changes of incumbents in trajectory
Returns
-------
Optional[Configuration]
incumbent configuration
"""
if self.instance_as_budget:
new_incumbent = super()._compare_configs(incumbent, challenger, run_history, log_traj)
# if compare config returned none, then it is undecided. So return old incumbent
new_incumbent = incumbent if new_incumbent is None else new_incumbent
return new_incumbent
# For real-valued budgets, compare configs based on the incumbent selection design
curr_budget = self.all_budgets[self.stage]
# incumbent selection: best on any budget
if self.incumbent_selection == "any_budget":
new_incumbent = self._compare_configs_across_budgets(
challenger=challenger,
incumbent=incumbent,
run_history=run_history,
log_traj=log_traj,
)
return new_incumbent
# get runs for both configurations
inc_runs = run_history.get_runs_for_config(incumbent, only_max_observed_budget=True)
chall_runs = run_history.get_runs_for_config(challenger, only_max_observed_budget=True)
if len(inc_runs) > 1:
raise ValueError(
"Number of incumbent runs on budget %f must not exceed 1, but is %d",
inc_runs[0].budget,
len(inc_runs),
)
if len(chall_runs) > 1:
raise ValueError(
"Number of challenger runs on budget %f must not exceed 1, but is %d",
chall_runs[0].budget,
len(chall_runs),
)
inc_run = inc_runs[0]
chall_run = chall_runs[0]
# incumbent selection: highest budget only
if self.incumbent_selection == "highest_budget":
if chall_run.budget < self.max_budget:
self.logger.debug(
"Challenger (budget=%.4f) has not been evaluated on the highest budget %.4f yet.",
chall_run.budget,
self.max_budget,
)
return incumbent
# incumbent selection: highest budget run so far
if inc_run.budget > chall_run.budget:
self.logger.debug(
"Incumbent evaluated on higher budget than challenger (%.4f > %.4f), " "not changing the incumbent",
inc_run.budget,
chall_run.budget,
)
return incumbent
if inc_run.budget < chall_run.budget:
self.logger.debug(
"Challenger evaluated on higher budget than incumbent (%.4f > %.4f), " "changing the incumbent",
chall_run.budget,
inc_run.budget,
)
if log_traj:
# adding incumbent entry
self.stats.inc_changed += 1
new_inc_cost = run_history.get_cost(challenger)
self.traj_logger.add_entry(
train_perf=new_inc_cost,
incumbent_id=self.stats.inc_changed,
incumbent=challenger,
budget=curr_budget,
)
return challenger
# incumbent and challenger were both evaluated on the same budget, compare them based on their cost
chall_cost = run_history.get_cost(challenger)
inc_cost = run_history.get_cost(incumbent)
if chall_cost < inc_cost:
self.logger.info(
"Challenger (%.4f) is better than incumbent (%.4f) on budget %.4f.",
chall_cost,
inc_cost,
chall_run.budget,
)
self._log_incumbent_changes(incumbent, challenger)
new_incumbent = challenger
if log_traj:
# adding incumbent entry
self.stats.inc_changed += 1 # first incumbent
self.traj_logger.add_entry(
train_perf=chall_cost,
incumbent_id=self.stats.inc_changed,
incumbent=new_incumbent,
budget=curr_budget,
)
else:
self.logger.debug(
"Incumbent (%.4f) is at least as good as the challenger (%.4f) on budget %.4f.",
inc_cost,
chall_cost,
inc_run.budget,
)
if log_traj and self.stats.inc_changed == 0:
# adding incumbent entry
self.stats.inc_changed += 1 # first incumbent
self.traj_logger.add_entry(
train_perf=inc_cost,
incumbent_id=self.stats.inc_changed,
incumbent=incumbent,
budget=curr_budget,
)
new_incumbent = incumbent
return new_incumbent
def _compare_configs_across_budgets(
self,
challenger: Configuration,
incumbent: Configuration,
run_history: RunHistory,
log_traj: bool = True,
) -> Optional[Configuration]:
"""Compares challenger with current incumbent on any budget.
Parameters
----------
challenger : Configuration
promising configuration
incumbent : Configuration
best configuration so far
run_history : smac.runhistory.runhistory.RunHistory
stores all runs we ran so far
log_traj : bool
whether to log changes of incumbents in trajectory
Returns
-------
Optional[Configuration]
incumbent configuration
"""
curr_budget = self.all_budgets[self.stage]
# compare challenger and incumbent based on cost
chall_cost = run_history.get_min_cost(challenger)
inc_cost = run_history.get_min_cost(incumbent)
if np.isfinite(chall_cost) and np.isfinite(inc_cost):
if chall_cost < inc_cost:
self.logger.info(
"Challenger (%.4f) is better than incumbent (%.4f) for any budget.",
chall_cost,
inc_cost,
)
self._log_incumbent_changes(incumbent, challenger)
new_incumbent = challenger
if log_traj:
# adding incumbent entry
self.stats.inc_changed += 1 # first incumbent
self.traj_logger.add_entry(
train_perf=chall_cost,
incumbent_id=self.stats.inc_changed,
incumbent=new_incumbent,
budget=curr_budget,
)
else:
self.logger.debug(
"Incumbent (%.4f) is at least as good as the challenger (%.4f) for any budget.",
inc_cost,
chall_cost,
)
if log_traj and self.stats.inc_changed == 0:
# adding incumbent entry
self.stats.inc_changed += 1 # first incumbent
self.traj_logger.add_entry(
train_perf=inc_cost,
incumbent_id=self.stats.inc_changed,
incumbent=incumbent,
budget=curr_budget,
)
new_incumbent = incumbent
else:
self.logger.debug("Non-finite costs from run history!")
new_incumbent = incumbent
return new_incumbent
def _top_k(self, configs: List[Configuration], run_history: RunHistory, k: int) -> List[Configuration]:
"""Selects the top 'k' configurations from the given list based on their performance.
This retrieves the performance for each configuration from the runhistory and checks
that the highest budget they've been evaluated on is the same for each of the configurations.
Parameters
----------
configs: List[Configuration]
list of configurations to filter from
run_history: smac.runhistory.runhistory.RunHistory
stores all runs we ran so far
k: int
number of configurations to select
Returns
-------
List[Configuration]
top challenger configurations, sorted in increasing costs
"""
# extracting costs for each given configuration
config_costs = {}
# sample list instance-seed-budget key to act as base
run_key = run_history.get_runs_for_config(configs[0], only_max_observed_budget=True)
for c in configs:
# ensuring that all configurations being compared are run on the same set of instance, seed & budget
cur_run_key = run_history.get_runs_for_config(c, only_max_observed_budget=True)
# Move to compare set -- get_runs_for_config queries form a dictionary
# which is not an ordered structure. Some queries to that dictionary returned unordered
# list which wrongly trigger the below if
if set(cur_run_key) != set(run_key):
raise ValueError(
"Cannot compare configs that were run on different instances-seeds-budgets: %s vs %s"
% (run_key, cur_run_key)
)
config_costs[c] = run_history.get_cost(c)
configs_sorted = [k for k, v in sorted(config_costs.items(), key=lambda item: item[1])]
# select top configurations only
top_configs = configs_sorted[:k]
return top_configs
def _all_config_inst_seed_pairs_launched(
self,
run_history: RunHistory,
activate_configuration_being_intensified: Optional[Configuration],
) -> bool:
"""When running SH, M configs might require N instances. Before moving to the next stage, we
need to make sure that tasks (each of the MxN jobs) are launched.
This function returns a true if any M configs are pending or if N instance/seed are
still remaining.
Parameters
----------
run_history : RunHistory
stores all runs we ran so far
activate_configuration_being_intensified: Optional[Configuration]
The last configuration being actively processes by this intensifier
Returns
-------
bool: whether a instance/pair of any of the M configurations for the current
stage are pending
"""
# 1: First we count the number of configurations that have been launched
# We only submit a new configuration M if all instance-seed pairs of (M - 1)
# have been proposed
configurations_by_this_intensifier = [c for c, i, s, b in self.run_tracker]
running_configs = set()
for k, v in run_history.data.items():
if run_history.ids_config[k.config_id] in configurations_by_this_intensifier:
# We get all configurations launched by the current intensifier
# regardless if status is RUNNING or not, to make it more robust
running_configs.add(run_history.ids_config[k.config_id])
# The total number of runs for this stage account for finished configurations
# (success + failed + do not advance) + the offset + running but not finished
# configurations. Also we account for the instances not launched for the
# currently running configuration
total_pending_configurations = max(
0,
self.n_configs_in_stage[self.stage]
- (
len(
set().union(
self.success_challengers,
self.fail_challengers,
self.do_not_advance_challengers,
running_configs,
)
)
+ self.fail_chal_offset
),
)
# 2: Second we have to account for the number of pending instances for the active
# configuration. We assume for all (M - 1) configurations, all N instances-seeds
# have been already been launched
curr_budget = self.all_budgets[self.stage]
if self.instance_as_budget:
prev_budget = int(self.all_budgets[self.stage - 1]) if self.stage > 0 else 0
curr_insts = self.inst_seed_pairs[int(prev_budget) : int(curr_budget)]
else:
curr_insts = self.inst_seed_pairs
if activate_configuration_being_intensified is None:
# When a new stage begins, there is no active configuration.
# Therefore activate_configuration_being_intensified is empty and all instances are
# remaining
pending_instances_to_launch = len(curr_insts)
else:
# self.curr_inst_idx - 1 is the last proposed instance/seed pair from get_next_run
# But it is zero indexed, so (self.curr_inst_idx - 1) + 1 is the number of
# configurations that we have proposed to run in total for the running
# configuration via get_next_run
pending_instances_to_launch = max(
len(curr_insts) - self.curr_inst_idx[activate_configuration_being_intensified], 0
) # type: ignore
# If the there are any pending configuration, or instances/seed pending for the
# active runner, we return a boolean
return (total_pending_configurations + pending_instances_to_launch) <= 0
[docs]class SuccessiveHalving(ParallelScheduler):
"""Races multiple challengers against an incumbent using Successive Halving method.
Implementation following the description in
"BOHB: Robust and Efficient Hyperparameter Optimization at Scale" (Falkner et al. 2018)
Supplementary reference: http://proceedings.mlr.press/v80/falkner18a/falkner18a-supp.pdf
Successive Halving intensifier (and Hyperband) can operate on two kinds of budgets:
1. **'Instances' as budget**:
When multiple instances are provided or when run objective is "runtime", this is the criterion used as budget
for successive halving iterations i.e., the budget determines how many instances the challengers are evaluated
on at a time. Top challengers for the next iteration are selected based on the combined performance across
all instances used.
If ``initial_budget`` and ``max_budget`` are not provided, then they are set to 1 and total number
of available instances respectively by default.
2. **'Real-valued' budget**:
This is used when there is only one instance provided and when run objective is "quality",
i.e., budget is a positive, real-valued number that can be passed to the target algorithm as an argument.
It can be used to control anything by the target algorithm, Eg: number of epochs for training a neural network.
``initial_budget`` and ``max_budget`` are required parameters for this type of budget.
Examples for successive halving (and hyperband) can be found here:
* Runtime objective and multiple instances *(instances as budget)*: `examples/spear_qcp/SMAC4AC_SH_spear_qcp.py`
* Quality objective and multiple instances *(instances as budget)*: `examples/SMAC4MF_sgd_instances.py`
* Quality objective and single instance *(real-valued budget)*: `examples/SMAC4MF_mlp.py`
This class instantiates `_SuccessiveHalving` objects on a need basis, that is, to
prevent workers from being idle. The actual logic that implements the Successive halving method
lies on the _SuccessiveHalving class.
Parameters
----------
stats: smac.stats.stats.Stats
stats object
traj_logger: smac.utils.io.traj_logging.TrajLogger
TrajLogger object to log all new incumbents
rng : np.random.RandomState
instances : List[str]
list of all instance ids
instance_specifics : Mapping[str, str]
mapping from instance name to instance specific string
cutoff : Optional[int]
cutoff of TA runs
deterministic : bool
whether the TA is deterministic or not
initial_budget : Optional[float]
minimum budget allowed for 1 run of successive halving
max_budget : Optional[float]
maximum budget allowed for 1 run of successive halving
eta : float
'halving' factor after each iteration in a successive halving run. Defaults to 3
num_initial_challengers : Optional[int]
number of challengers to consider for the initial budget. If None, calculated internally
run_obj_time : bool
whether the run objective is runtime or not (if true, apply adaptive capping)
n_seeds : Optional[int]
Number of seeds to use, if TA is not deterministic. Defaults to None, i.e., seed is set as 0
instance_order : Optional[str]
how to order instances. Can be set to: [None, shuffle_once, shuffle]
* None - use as is given by the user
* shuffle_once - shuffle once and use across all SH run (default)
* shuffle - shuffle before every SH run
adaptive_capping_slackfactor : float
slack factor of adpative capping (factor * adaptive cutoff)
inst_seed_pairs : List[Tuple[str, int]], optional
Do not set this argument, it will only be used by hyperband!
min_chall: int
minimal number of challengers to be considered (even if time_bound is exhausted earlier). This class will
raise an exception if a value larger than 1 is passed.
incumbent_selection: str
How to select incumbent in successive halving. Only active for real-valued budgets.
Can be set to: [highest_executed_budget, highest_budget, any_budget]
* highest_executed_budget - incumbent is the best in the highest budget run so far (default)
* highest_budget - incumbent is selected only based on the highest budget
* any_budget - incumbent is the best on any budget i.e., best performance regardless of budget
"""
def __init__(
self,
stats: Stats,
traj_logger: TrajLogger,
rng: np.random.RandomState,
instances: List[str],
instance_specifics: Mapping[str, str] = None,
cutoff: Optional[float] = None,
deterministic: bool = False,
initial_budget: Optional[float] = None,
max_budget: Optional[float] = None,
eta: float = 3,
num_initial_challengers: Optional[int] = None,
run_obj_time: bool = True,
n_seeds: Optional[int] = None,
instance_order: Optional[str] = "shuffle_once",
adaptive_capping_slackfactor: float = 1.2,
inst_seed_pairs: Optional[List[Tuple[str, int]]] = None,
min_chall: int = 1,
incumbent_selection: str = "highest_executed_budget",
) -> None:
super().__init__(
stats=stats,
traj_logger=traj_logger,
rng=rng,
instances=instances,
instance_specifics=instance_specifics,
cutoff=cutoff,
deterministic=deterministic,
run_obj_time=run_obj_time,
adaptive_capping_slackfactor=adaptive_capping_slackfactor,
min_chall=min_chall,
)
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
# Successive Halving Hyperparameters
self.n_seeds = n_seeds
self.instance_order = instance_order
self.inst_seed_pairs = inst_seed_pairs
self.incumbent_selection = incumbent_selection
self._instances = instances
self._instance_specifics = instance_specifics
self.initial_budget = initial_budget
self.max_budget = max_budget
self.eta = eta
self.num_initial_challengers = num_initial_challengers
def _get_intensifier_ranking(self, intensifier: AbstractRacer) -> Tuple[int, int]:
"""Given a intensifier, returns how advance it is. This metric will be used to determine
what priority to assign to the intensifier.
Parameters
----------
intensifier: AbstractRacer
Intensifier to rank based on run progress
Returns
-------
ranking: int
the higher this number, the faster the intensifier will get
the running resources. For hyperband we can use the
sh_intensifier stage, for example
tie_breaker: int
The configurations that have been launched to break ties. For
example, in the case of Successive Halving it can be the number
of configurations launched
"""
# For mypy -- we expect to work with Hyperband instances
assert isinstance(intensifier, _SuccessiveHalving)
# Each row of this matrix is id, stage, configs+instances for stage
# We use sh.run_tracker as a cheap way to know how advanced the run is
# in case of stage ties among successive halvers. sh.run_tracker is
# also emptied each iteration
stage = 0
if hasattr(intensifier, "stage"):
# Newly created _SuccessiveHalving objects have no stage
stage = intensifier.stage
return stage, len(intensifier.run_tracker)
def _add_new_instance(self, num_workers: int) -> bool:
"""Decides if it is possible to add a new intensifier instance, and adds it. If a new
intensifier instance is added, True is returned, else False.
Parameters
----------
num_workers: int
the maximum number of workers available
at a given time.
Returns
-------
Whether or not a successive halving instance was added
"""
if len(self.intensifier_instances) >= num_workers:
return False
self.intensifier_instances[len(self.intensifier_instances)] = _SuccessiveHalving(
stats=self.stats,
traj_logger=self.traj_logger,
rng=self.rs,
instances=self._instances,
instance_specifics=self._instance_specifics,
cutoff=self.cutoff,
deterministic=self.deterministic,
initial_budget=self.initial_budget,
max_budget=self.max_budget,
eta=self.eta,
num_initial_challengers=self.num_initial_challengers,
run_obj_time=self.run_obj_time,
n_seeds=self.n_seeds,
instance_order=self.instance_order,
adaptive_capping_slackfactor=self.adaptive_capping_slackfactor,
inst_seed_pairs=self.inst_seed_pairs,
min_chall=self.min_chall,
incumbent_selection=self.incumbent_selection,
identifier=len(self.intensifier_instances),
)
return True