from __future__ import annotations
from typing import Callable, Iterator
import numpy as np
from ConfigSpace import Configuration
from smac.intensifier.abstract_intensifier import AbstractIntensifier
from smac.intensifier.successive_halving import SuccessiveHalving
from smac.runhistory import StatusType, TrialInfo, TrialInfoIntent, TrialValue
from smac.runhistory.runhistory import RunHistory
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
[docs]class SuccessiveHalvingWorker(AbstractIntensifier):
"""This is the worker class for Successive Halving.
Warning
-------
Do not use this class as stand-alone.
Parameters
----------
successive_halving : SuccessiveHalving
The controller of the instance.
identifier : int, defaults to 0
Adds a numerical identifier on the instance. Used for debug and tagging logger messages properly.
_all_budgets : np.ndarray | None, defaults to None
Used internally when HB uses SH as a sub-routing.
_n_configs_in_stage : np.ndarray | None, defaults to None
Used internally when HB uses SH as a sub-routing.
_min_budget : float | None, defaults to None
Overwrites the budget from the controller instance if it is not none.
_max_budget : float | None, defaults to None
Overwrites the budget from the controller instance if it is not none.
"""
def __init__(
self,
successive_halving: SuccessiveHalving,
identifier: int = 0,
_all_budgets: list[float] | None = None,
_n_configs_in_stage: np.ndarray | None = None,
_min_budget: float | None = None,
_max_budget: float | None = None,
) -> None:
super().__init__(
scenario=successive_halving._scenario,
min_challenger=successive_halving._min_challenger,
seed=successive_halving._seed,
)
self._successive_halving = successive_halving
self._stats = successive_halving._stats
self._first_run = True
self._identifier = identifier
self._logger = get_logger(f"{__name__}.{identifier}")
self._min_budget = _min_budget if _min_budget is not None else successive_halving._min_budget
self._max_budget = _max_budget if _max_budget is not None else successive_halving._max_budget
if _all_budgets is not None and _n_configs_in_stage is not None:
# Assert we use the given numbers to avoid rounding issues, see #701
self._all_budgets = _all_budgets
self._n_configs_in_stage = _n_configs_in_stage
else:
eta = successive_halving._eta
# Max sh iterations and n initial challengers are depending on min and max budget
# Since min and max budget can be changed by the user, we need to update
# Pre-computing stuff for SH
(
self._max_sh_iterations,
self._n_initial_challengers,
self._all_budgets,
) = self._successive_halving.calculate_budgets(self._min_budget, self._max_budget)
# Number of challengers to consider in each stage
linspace = -np.linspace(0, self._max_sh_iterations, self._max_sh_iterations + 1)
n_configs_in_stage = self._n_initial_challengers * np.power(eta, linspace)
self._n_configs_in_stage = np.array(np.round(n_configs_in_stage), dtype=int).tolist()
self._sh_iters = 0
self._stage = 0
self._configs_to_run: list[Configuration] = []
# Current instance index tracks two things. Configurations that are to be launched,
# That is config A needs to run in 3 instances/seed pairs, then current_instance_indices should
# track this. But then, if a new configuration is added in the case of parallelism
# a new separate current_instance_indices needs to be started.
# The indices normally are of type int, but np.inf is used to indicate to not further
# launch instances for this configuration, hence the type is Union[int, float].
self._current_instance_indices: dict[Configuration, int | float] = {}
self._running_challenger = None
self._success_challengers: set[Configuration] = set()
self._do_not_advance_challengers: set[Configuration] = set()
self._failed_challengers: set[Configuration] = set()
self._failed_challenger_offset = 0
self._new_challenger = False
# Track which configs were launched. This will allow to have an extra check to make sure
# that a successive halver deals only with the configs it launched,
# but also allows querying the status of the configs via the run history.
# In other works, the run history is agnostic of the origin of the configurations,
# that is, which successive halving instance created it. The RunInfo object
# is aware of this information, and for parallel execution, the routing of
# finish results is expected to use this information.
# Nevertheless, the common object among SMBO/intensifier, which is the
# run history, does not have this information and so we track locally. That way,
# when we access the complete list of configs from the run history, we filter
# the ones launched by the current succesive halver using self._run_tracker
self._run_tracker: dict[tuple[Configuration, str | None, int | None, float | None], bool] = {}
# Those general arguments are initialized in the parent class
# self._num_trials = 0
# self._challenger_id = 0
# self._iteration_done = False
# self._target_function_time = 0
@property
def stage(self) -> int:
"""The current stage of the worker."""
return self._stage
@property
def uses_seeds(self) -> bool: # noqa: D102
return self._successive_halving.uses_seeds
@property
def uses_budgets(self) -> bool: # noqa: D102
return self._successive_halving.uses_budgets
@property
def uses_instances(self) -> bool: # noqa: D102
return self._successive_halving.uses_instances
[docs] def get_target_function_seeds(self) -> list[int]: # noqa: D102
return self._successive_halving.get_target_function_seeds()
[docs] def get_target_function_budgets(self) -> list[float | None]: # noqa: D102
return self._successive_halving.get_target_function_budgets()
[docs] def get_target_function_instances(self) -> list[str | None]: # noqa: D102
return self._successive_halving.get_target_function_instances()
[docs] def process_results(
self,
trial_info: TrialInfo,
trial_value: TrialValue,
incumbent: Configuration | None,
runhistory: RunHistory,
time_bound: float,
log_trajectory: bool = True,
) -> tuple[Configuration, float]: # noqa: D102
# Mark the fact that we processed this configuration
self._run_tracker[(trial_info.config, trial_info.instance, trial_info.seed, trial_info.budget)] = True
# If The incumbent is None and it is the first run, we use the challenger
if not incumbent and self._first_run:
# We already displayed this message before
# self._logger.info("First run and no incumbent provided. Challenger is assumed to be the incumbent.")
incumbent = trial_info.config
self._first_run = False
# In a serial run, if we have to CAP a run, then we stop launching
# more configurations for this run.
# In the context of parallelism, we launch the instances proactively
# The fact that self._current_instance_indices[trial_info.config] is np.inf means
# that no more instances will be launched for the current config, so we
# can add a check to make sure that if we are capping, this makes sense
# for the active challenger
# if result.status == StatusType.CAPPED and trial_info.config == self._running_challenger:
# self._current_instance_indices[trial_info.config] = np.inf
# else:
self._target_function_time += trial_value.time
self._num_trials += 1
# 0: Before moving to a new stage, we have to complete M x N tasks, where M is the
# total number of configurations evaluated in N instance/seed pairs.
# The last active configuration is stored in self.running challengers is the M
# configuration, and to get to this point, we have already submitted tasks
# for (M - 1) configurations on N instances seed-pairs. The status of such
# (M - 1) * N tasks is tracked in self._run_tracker, that has a value of False
# if not processed and true if such task has been processed.
# This stage is complete only if all tasks have been launched and all of the
# already launched tasks are processed.
# 1: We first query if we have launched everything already (All M * N tasks)
all_config_inst_seed_launched = self._all_config_instance_seed_pairs_launched(
runhistory=runhistory,
activate_configuration_being_intensified=self._running_challenger,
)
# 2: Then we get the already submitted tasks (that is, proposed by get_next_trials),
# that have not yet been processed process_results
all_config_inst_seeds_processed = len([v for v in self._run_tracker.values() if not v]) <= 0
# 3: Then the total number of remaining task before we can conclude this stage
# is calculated by taking into account point 2 and 3 above
is_stage_done = all_config_inst_seed_launched and all_config_inst_seeds_processed
# ADDED IN SMAC 2.0: Makes sure we can not call `tell` method without `ask` first
if trial_info.config not in self._current_instance_indices:
raise RuntimeError("Successive Halving does not support calling `tell` method without calling `ask` first.")
# adding challengers to the list of evaluated challengers
# - Stop: CAPPED/CRASHED/TIMEOUT/MEMOUT/DONOTADVANCE (!= SUCCESS)
# - Advance to next stage: SUCCESS
# curr_challengers is a set, so "at least 1" success can be counted by set addition (no duplicates)
# If a configuration is successful, it is added to curr_challengers.
# if it fails it is added to fail_challengers.
if np.isfinite(self._current_instance_indices[trial_info.config]) and trial_value.status == StatusType.SUCCESS:
self._success_challengers.add(trial_info.config) # successful configs
elif (
np.isfinite(self._current_instance_indices[trial_info.config])
and trial_value.status == StatusType.DONOTADVANCE
):
self._do_not_advance_challengers.add(trial_info.config)
else:
self._failed_challengers.add(trial_info.config) # capped/crashed/do not advance configs
# We need to update the incumbent if this config we are processing
# completes all scheduled instance-seed pairs.
# Here, a config/seed/instance is going to be processed for the first time
# (it has been previously scheduled by get_next_trial and marked False, indicating
# that it has not been processed yet. Entering process_results() this config/seed/instance
# is marked as TRUE as an indication that it has finished and should be processed)
# so if all configurations runs are marked as TRUE it means that this new config
# was the missing piece to have everything needed to compare against the incumbent
update_incumbent = all([v for k, v in self._run_tracker.items() if k[0] == trial_info.config])
# get incumbent if all instances have been evaluated
if is_stage_done or update_incumbent:
incumbent = self._compare_configs(
challenger=trial_info.config,
incumbent=incumbent,
runhistory=runhistory,
log_trajectory=log_trajectory,
)
if is_stage_done:
self._logger.info(
"Finished Successive Halving iteration-step %d-%d with "
"budget [%.2f / %d] and %d evaluated challenger(s)."
% (
self._sh_iters + 1,
self._stage + 1,
self._all_budgets[self._stage],
self._max_budget,
self._n_configs_in_stage[self._stage],
)
)
self._update_stage(runhistory=runhistory)
# Get incumbent cost
inc_perf = runhistory.get_cost(incumbent)
return incumbent, inc_perf
[docs] def get_next_trial(
self,
challengers: list[Configuration] | None,
incumbent: Configuration,
get_next_configurations: Callable[[], Iterator[Configuration]] | None,
runhistory: RunHistory,
repeat_configs: bool = True,
n_workers: int = 1,
) -> tuple[TrialInfoIntent, TrialInfo]:
"""Selects which challenger to use based on the iteration stage and set the iteration
parameters. First iteration will choose configurations from the function ``get_next_configurations`` or input
challengers, while the later iterations pick top configurations from the previously selected
challengers in that iteration.
Parameters
----------
challengers : list[Configuration] | None
Promising configurations.
incumbent : Configuration
Incumbent configuration.
get_next_configurations : Callable[[], Iterator[Configuration]] | None, defaults to none
Function that generates next configurations to use for racing.
runhistory : RunHistory
repeat_configs : bool, defaults to true
If false, an evaluated configuration will not be generated again.
n_workers : int, optional, defaults to 1
The maximum number of workers available.
Returns
-------
TrialInfoIntent
Indicator of how to consume the TrialInfo object.
TrialInfo
An object that encapsulates necessary information of the trial.
"""
if n_workers > 1:
self._logger.warning(
"Consider using ParallelSuccesiveHalving instead of "
"SuccesiveHalving. The later will halt on each stage "
"transition until all configs for the current stage are completed."
)
sh = self._successive_halving
# In the case of multiprocessing, we have runs in Running stage, which have not
# been processed via process_results(). get_next_trial() is called agnostically by
# smbo. To prevent launching more configs, than the ones needed, we query if
# there is room for more configurations, else we wait for process_results()
# to trigger a new stage
if self._all_config_instance_seed_pairs_launched(runhistory, self._running_challenger):
return TrialInfoIntent.WAIT, TrialInfo(
config=None,
instance=None,
seed=None,
budget=None,
source=self._identifier,
)
# Sampling from next challenger marks the beginning of a new iteration
self._iteration_done = False
current_budget = self._all_budgets[self._stage]
# If all instances have been executed, then reset and move on to next config
if sh._instance_as_budget:
previous_budget = int(self._all_budgets[self._stage - 1]) if self._stage > 0 else 0
n_insts = int(current_budget) - previous_budget
else:
n_insts = len(sh._instance_seed_pairs)
# The instances remaining tell us, per configuration, how many instances we
# have suggested to SMBO
n_insts_remaining = n_insts
if self._running_challenger is not None:
n_insts_remaining = n_insts - self._current_instance_indices[self._running_challenger]
# If there are instances pending, finish running configuration
if self._running_challenger and n_insts_remaining > 0:
challenger = self._running_challenger
new_challenger = False
else:
# Select next configuration
if self._stage == 0:
# first stage, so sample from configurations/chooser provided
challenger = self._next_challenger(
challengers=challengers,
get_next_configurations=get_next_configurations,
runhistory=runhistory,
repeat_configs=repeat_configs,
)
if challenger is None:
# If no challenger was sampled from the EPM or
# initial challengers, it might mean that the EPM
# is proposing a configuration that is currently running.
# There is a filtering on the above _next_challenger to return
# None if the proposed config us already in the run history
# To get a new config, we wait for more data
return TrialInfoIntent.WAIT, TrialInfo(
config=None,
instance=None,
seed=None,
budget=None,
source=self._identifier,
)
new_challenger = True
else:
# Sample top configs from previously sampled configurations
try:
challenger = self._configs_to_run.pop(0)
new_challenger = False
except IndexError:
# self._configs_to_run is populated via update_stage,
# which is triggered after the completion of a run
# If by there are no more configs to run (which is the case
# if we run into a IndexError),
return TrialInfoIntent.SKIP, TrialInfo(
config=None,
instance=None,
seed=None,
budget=None,
source=self._identifier,
)
if challenger:
# We see a challenger for the first time, so no
# instance has been launched
self._current_instance_indices[challenger] = 0
self._challenger_id += 1
self._running_challenger = challenger
if (incumbent is None or incumbent == challenger) and self._first_run:
self._logger.info("First run and no incumbent provided. Challenger is assumed to be the incumbent.")
incumbent = challenger
# Selecting instance-seed subset for this budget, depending on the kind of budget
if sh._instance_as_budget:
previous_budget = int(self._all_budgets[self._stage - 1]) if self._stage > 0 else 0
current_instances = sh._instance_seed_pairs[int(previous_budget) : int(current_budget)]
else:
current_instances = sh._instance_seed_pairs
self._logger.debug(f"Running challenger: {challenger}")
# run the next instance-seed pair for the given configuration
instance, seed = current_instances[self._current_instance_indices[challenger]] # type: ignore[index]
# At this point self._current_instance_indices[challenger] will still be an integer and might
# be marked LATER with np.inf, so ignore mypy error.
# For testing purposes, this attribute highlights whether a
# new challenger is proposed or not. Not required from a functional
# perspective
self._new_challenger = new_challenger
budget = None if sh._instance_as_budget else current_budget
self._run_tracker[(challenger, instance, seed, budget)] = False
# self._current_instance_indices Tell us our current instance to be run. The upcoming return
# will launch a challenger on a given instance/seed/pair. The next time this function
# is called, we will like to run self._current_instance_indices + 1 for this configuration
self._current_instance_indices[challenger] += 1
return TrialInfoIntent.RUN, TrialInfo(
config=challenger,
instance=instance,
seed=seed,
budget=budget,
source=self._identifier,
)
def _update_stage(self, runhistory: RunHistory) -> None:
"""Updates tracking information for a new stage/iteration and update statistics. This method
is called to initialize stage variables and after all configurations of a Successive Halving
stage are completed.
"""
self._stage += 1
# Only uncapped challengers are considered valid for the next iteration
valid_challengers = list(
(self._success_challengers | self._do_not_advance_challengers) - self._failed_challengers
)
if self._stage < len(self._all_budgets) and len(valid_challengers) > 0:
# If this is the next stage in same iteration,
# use top 'k' from the evaluated configurations for next iteration
# Determine 'k' for the next iteration - at least 1
next_n_chal = int(max(1, self._n_configs_in_stage[self._stage]))
# Selecting the top 'k' challengers for the next iteration
configs_to_run = self._top_k(configs=valid_challengers, runhistory=runhistory, k=next_n_chal)
self._configs_to_run = [
config for config in configs_to_run if config not in self._do_not_advance_challengers
]
# If some runs were capped, top_k returns less than the required configurations
# to handle that, we keep track of how many configurations are missing
# (since they are technically failed here too)
missing_challengers = int(self._n_configs_in_stage[self._stage]) - len(self._configs_to_run)
if missing_challengers > 0:
self._failed_challenger_offset = missing_challengers
else:
self._failed_challenger_offset = 0
if next_n_chal == missing_challengers:
next_stage = True
self._logger.info(
"Successive Halving iteration-step: %d-%d with "
"budget [%.2f / %d] - expected %d new challenger(s), but "
"no configurations propagated to the next budget.",
self._sh_iters + 1,
self._stage + 1,
self._all_budgets[self._stage],
self._max_budget,
self._n_configs_in_stage[self._stage],
)
else:
next_stage = False
else:
next_stage = True
if next_stage:
# Update stats for the prev iteration
assert self._stats
self._stats.update_average_configs_per_intensify(n_configs=self._challenger_id)
# Reset stats for the new iteration
self._sh_iters += 1
self._stage = 0
self._configs_to_run = []
self._target_function_time = 0
self._challenger_id = 0
self._num_trials = 0
self._iteration_done = True
self._failed_challenger_offset = 0
self._run_tracker = {}
# Randomize instance-seed pairs per successive halving run, if user specifies
if self._successive_halving._instance_order == "shuffle":
self._rng.shuffle(self._instance_seed_pairs) # type: ignore
# To track configurations for the next stage
self._running_challenger = None
self._success_challengers = set()
self._do_not_advance_challengers = set()
self._failed_challengers = set()
self._current_instance_indices = {}
def _compare_configs(
self,
incumbent: Configuration,
challenger: Configuration,
runhistory: RunHistory,
log_trajectory: bool = True,
) -> Configuration | None:
"""Compares the challenger with the current incumbent and returns the best configuration based
on the given incumbent selection design.
Returns
-------
configuration : Configuration | None
The better configuration.
"""
assert self._stats
if self._successive_halving._instance_as_budget:
new_incumbent = super()._compare_configs(incumbent, challenger, runhistory, log_trajectory)
# If compare config returned none, then it is undecided. So return old incumbent.
return incumbent if new_incumbent is None else new_incumbent
# For real-valued budgets, compare configs based on the incumbent selection design
current_budget = self._all_budgets[self._stage]
# incumbent selection: best on any budget
if self._successive_halving._incumbent_selection == "any_budget":
new_incumbent = self._compare_configs_across_budgets(
challenger=challenger,
incumbent=incumbent,
runhistory=runhistory,
log_trajectory=log_trajectory,
)
return new_incumbent
# get runs for both configurations
inc_runs = runhistory.get_trials(incumbent, only_max_observed_budget=True)
chall_runs = runhistory.get_trials(challenger, only_max_observed_budget=True)
if len(inc_runs) > 1:
raise ValueError(
"Number of incumbent runs on budget %f must not exceed 1, but is %d",
inc_runs[0].budget,
len(inc_runs),
)
if len(chall_runs) > 1:
raise ValueError(
"Number of challenger runs on budget %f must not exceed 1, but is %d",
chall_runs[0].budget,
len(chall_runs),
)
inc_run = inc_runs[0]
chall_run = chall_runs[0]
if inc_run.budget is None or chall_run.budget is None:
raise RuntimeError("Since budgets are not used for instance optimization, this should not happen.")
# Incumbent selection: highest budget only
if self._successive_halving._incumbent_selection == "highest_budget":
assert chall_run.budget is not None
if chall_run.budget < self._max_budget:
self._logger.debug(
"Challenger (budget=%.4f) has not been evaluated on the highest budget %.4f yet.",
chall_run.budget,
self._max_budget,
)
return incumbent
# Incumbent selection: highest budget run so far
if inc_run.budget > chall_run.budget:
self._logger.debug(
"Incumbent evaluated on higher budget than challenger (%.4f > %.4f), not changing the incumbent",
inc_run.budget,
chall_run.budget,
)
return incumbent
if inc_run.budget < chall_run.budget:
self._logger.debug(
"Challenger evaluated on higher budget than incumbent (%.4f > %.4f), changing the incumbent",
chall_run.budget,
inc_run.budget,
)
if log_trajectory:
assert self._stats
self._stats.add_incumbent(
cost=runhistory.get_cost(challenger),
incumbent=challenger,
budget=current_budget,
)
return challenger
# incumbent and challenger were both evaluated on the same budget, compare them based on their cost
chall_cost = runhistory.get_cost(challenger)
inc_cost = runhistory.get_cost(incumbent)
if chall_cost < inc_cost:
self._logger.info(
"Challenger (%.4f) is better than incumbent (%.4f) on budget %.4f.",
chall_cost,
inc_cost,
chall_run.budget,
)
self._log_incumbent_changes(incumbent, challenger)
new_incumbent = challenger
if log_trajectory:
assert self._stats
self._stats.add_incumbent(cost=chall_cost, incumbent=new_incumbent, budget=current_budget)
else:
self._logger.debug(
"Incumbent (%.4f) is at least as good as the challenger (%.4f) on budget %.4f.",
inc_cost,
chall_cost,
inc_run.budget,
)
if log_trajectory and self._stats.incumbent_changed == 0:
assert self._stats
self._stats.add_incumbent(
cost=inc_cost,
incumbent=incumbent,
budget=current_budget,
)
new_incumbent = incumbent
return new_incumbent
def _compare_configs_across_budgets(
self,
challenger: Configuration,
incumbent: Configuration,
runhistory: RunHistory,
log_trajectory: bool = True,
) -> Configuration | None:
"""Compares challenger with current incumbent on any budget.
Returns
-------
configuration : Configuration | None
The better configuration.
"""
assert self._stats
current_budget = self._all_budgets[self._stage]
# compare challenger and incumbent based on cost
chall_cost = runhistory.get_min_cost(challenger)
inc_cost = runhistory.get_min_cost(incumbent)
if np.isfinite(chall_cost) and np.isfinite(inc_cost):
if chall_cost < inc_cost:
self._logger.info(
"Challenger (%.4f) is better than incumbent (%.4f) for any budget.",
chall_cost,
inc_cost,
)
self._log_incumbent_changes(incumbent, challenger)
new_incumbent = challenger
if log_trajectory:
assert self._stats
self._stats.add_incumbent(
cost=chall_cost,
incumbent=new_incumbent,
budget=current_budget,
)
else:
self._logger.debug(
"Incumbent (%.4f) is at least as good as the challenger (%.4f) for any budget.",
inc_cost,
chall_cost,
)
if log_trajectory and self._stats.incumbent_changed == 0:
self._stats.add_incumbent(cost=inc_cost, incumbent=incumbent, budget=current_budget)
new_incumbent = incumbent
else:
self._logger.debug("Non-finite costs from run history!")
new_incumbent = incumbent
return new_incumbent
def _top_k(self, configs: list[Configuration], runhistory: RunHistory, k: int) -> list[Configuration]:
"""Selects the top 'k' configurations from the given list based on their performance.
This retrieves the performance for each configuration from the runhistory and checks
that the highest budget they've been evaluated on is the same for each of the configurations.
Parameters
----------
configs : list[Configuration]
List of configurations to filter from.
runhistory : RunHistory
k : int
Number of configurations to select
Returns
-------
list[Configuration]
Top challenger configurations, sorted in increasing costs.
"""
# Extracting costs for each given configuration
config_costs = {}
# Sample list instance-seed-budget key to act as base
run_key = runhistory.get_trials(configs[0], only_max_observed_budget=True)
for c in configs:
# Ensuring that all configurations being compared are run on the same set of instance, seed & budget
cur_run_key = runhistory.get_trials(c, only_max_observed_budget=True)
# Move to compare set -- get_trials queries form a dictionary
# which is not an ordered structure. Some queries to that dictionary returned unordered
# list which wrongly trigger the below if
if set(cur_run_key) != set(run_key):
raise ValueError(
"Can not compare configs that were run on different instances-seeds-budgets: \n"
f"{run_key} and\n{cur_run_key}"
)
config_costs[c] = runhistory.get_cost(c)
configs_sorted = [k for k, v in sorted(config_costs.items(), key=lambda item: item[1])]
# Select top configurations only
top_configs = configs_sorted[:k]
return top_configs
def _all_config_instance_seed_pairs_launched(
self,
runhistory: RunHistory,
activate_configuration_being_intensified: Configuration | None,
) -> bool:
"""When running SH, M configs might require N instances. Before moving to the next stage, we
need to make sure that tasks (each of the MxN jobs) are launched.
This function returns a true if any M configs are pending or if N instance/seed are
still remaining.
Parameters
----------
runhistory : RunHistory
activate_configuration_being_intensified: Configuration | None
The last configuration being actively processes by this intensifier.
Returns
-------
pending : bool
Whether a instance/pair of any of the M configurations for the current stage are pending.
"""
# 1: First we count the number of configurations that have been launched
# We only submit a new configuration M if all instance-seed pairs of (M - 1)
# have been proposed
configurations_by_this_intensifier = [
c for c, i, s, b in self._run_tracker # if b == self._all_budgets[self.stage] # Bugfix closes #880
]
running_configs = set()
for k, v in runhistory.items():
if runhistory.ids_config[k.config_id] in configurations_by_this_intensifier:
# We get all configurations launched by the current intensifier
# regardless if status is RUNNING or not, to make it more robust
running_configs.add(runhistory.ids_config[k.config_id])
# The total number of runs for this stage account for finished configurations
# (success + failed + do not advance) + the offset + running but not finished
# configurations. Also we account for the instances not launched for the
# currently running configuration
total_pending_configurations = max(
0,
self._n_configs_in_stage[self._stage]
- (
len(
set().union(
self._success_challengers,
self._failed_challengers,
self._do_not_advance_challengers,
running_configs,
)
)
+ self._failed_challenger_offset
),
)
# 2: Second we have to account for the number of pending instances for the active
# configuration. We assume for all (M - 1) configurations, all N instances-seeds
# have been already been launched
current_budget = self._all_budgets[self._stage]
if self._successive_halving._instance_as_budget:
previous_budget = int(self._all_budgets[self._stage - 1]) if self._stage > 0 else 0
current_instances = self._successive_halving._instance_seed_pairs[
int(previous_budget) : int(current_budget)
]
else:
current_instances = self._successive_halving._instance_seed_pairs
if activate_configuration_being_intensified is None:
# When a new stage begins, there is no active configuration.
# Therefore activate_configuration_being_intensified is empty and all instances are
# remaining
pending_instances_to_launch = len(current_instances)
else:
# self._current_instance_indices - 1 is the last proposed instance/seed pair from get_next_trial
# But it is zero indexed, so (self._current_instance_indices - 1) + 1 is the number of
# configurations that we have proposed to run in total for the running
# configuration via get_next_trial
pending_instances_to_launch = max(
len(current_instances) - self._current_instance_indices[activate_configuration_being_intensified], 0
) # type: ignore
# If the there are any pending configuration, or instances/seed pending for the
# active runner, we return a boolean
return (total_pending_configurations + pending_instances_to_launch) <= 0