from __future__ import annotations
from typing import Any, Callable, Iterator, Optional, cast
from collections import Counter
from ConfigSpace import Configuration
from smac.constants import MAXINT
from smac.intensifier.abstract_intensifier import AbstractIntensifier
from smac.intensifier.stages import IntensifierStage
from smac.runhistory import (
InstanceSeedBudgetKey,
TrialInfo,
TrialInfoIntent,
TrialValue,
)
from smac.runhistory.runhistory import RunHistory
from smac.scenario import Scenario
from smac.utils.logging import format_array, get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]class Intensifier(AbstractIntensifier):
r"""Races challengers against an incumbent.
SMAC's intensification procedure, in detail:
Intensify(Θ_new, θ_inc, M, R, t_intensify, Π, c_hat) c_hat(θ, Π') denotes the empirical cost of θ on
the subset of instances Π' ⊆ Π, based on the trials in R; max_config_calls is a parameter where:
Θ_new: Sequence of parameter settings to evaluate, challengers in this class.
θ_inc: incumbent parameter setting, incumbent in this class.
1 for i := 1, . . . , length(Θ_new) do
2 θ_new ← Θ_new[i]
STAGE -> RUN_INCUMBENT
3 if R contains less than max_config_calls trials with configuration θ_inc then
4 Π' ← {π'∈ Π | R contains less than or equal number of trials using θ_inc and π'
0 than using θ_inc and any other π''∈ Π}
5 π ← instance sampled uniformly at random from Π'
6 s ← seed, drawn uniformly at random
7 R ← ExecuteRun(R, θ_inc, π, s)
8 N ← 1
STAGE -> RUN_CHALLENGER
9 while true do
10 S_missing ← {instance, seed} pairs for which θ_inc was run before, but not θ_new
11 S_torun ← random subset of S_missing of size min(N, size(S_missing))
12 foreach (π, s) ∈ S_torun do R ← ExecuteRun(R, θ_new, π, s)
13 S_missing ← S_missing \\ S_torun
14 Π_common ← instances for which we previously ran both θ_inc and θ_new
15 if c_hat(θ_new, Π_common) > c_hat(θ_inc, Π_common) then break
16 else if S_missing = ∅ then θ_inc ← θ_new; break
17 else N ← 2 · N
18 if time spent in this call to this procedure exceeds t_intensify and i ≥ 2 then break
19 return [R, θ_inc]
Parameters
----------
scenario : Scenario
min_config_calls : int, defaults to 1
Minimum number of trials per config (summed over all calls to intensify).
max_config_calls : int, defaults to 2000
Maximum number of trials per config (summed over all calls to intensify).
min_challenger : int, defaults to 2
Minimal number of challengers to be considered (even if time_bound is exhausted earlier).
intensify_percentage : float, defaults to 0.5
How much percentage of the time should configurations be intensified (evaluated on higher budgets or
more instances). This parameter is accessed in the SMBO class.
race_against : Configuration | None, defaults to none
If incumbent changes, race this configuration always against new incumbent. Prevents sometimes
over-tuning.
seed : int | None, defaults to none
"""
def __init__(
self,
scenario: Scenario,
min_config_calls: int = 1,
max_config_calls: int = 2000,
min_challenger: int = 2,
intensify_percentage: float = 0.5,
race_against: Configuration | None = None,
seed: int | None = None,
):
if scenario.deterministic:
if min_challenger != 1:
logger.info("The number of minimal challengers is set to one for deterministic algorithms.")
min_challenger = 1
# Intensify percentage must be between 0 and 1
assert intensify_percentage >= 0.0 and intensify_percentage <= 1.0
super().__init__(
scenario=scenario,
min_config_calls=min_config_calls,
max_config_calls=max_config_calls,
min_challenger=min_challenger,
seed=seed,
)
# General attributes
self._race_against = race_against
if race_against is not None and race_against.origin is None:
assert self._race_against is not None
if race_against == scenario.configspace.get_default_configuration():
self._race_against.origin = "Default"
else:
logger.warning(
"The passed configuration to the intensifier was not specified with an origin. "
"The origin is set to `Unknown`."
)
self._race_against.origin = "Unknown"
self._elapsed_time = 0.0
# Stage variables
# the intensification procedure is divided into 4 'stages':
# 0. Run 1st configuration (only in the 1st run when incumbent=None)
# 1. Add incumbent run
# 2. Race challenger
# 3. Race against configuration for a new incumbent
self._stage = IntensifierStage.RUN_FIRST_CONFIG
self._n_iters = 0
# Challenger related variables
self._challenger_id = 0
self._num_challenger_run = 0
self._current_challenger = None
self._continue_challenger = False
self._configs_to_run: Iterator[Optional[Configuration]] = iter([])
self._update_configs_to_run = True
self._intensify_percentage = intensify_percentage
self._last_seed_idx = -1
self._target_function_seeds = [
int(s) for s in self._rng.randint(low=0, high=MAXINT, size=self._max_config_calls)
]
# Racing related variables
self._to_run: list[InstanceSeedBudgetKey] = []
self._N = -1
@property
def intensify_percentage(self) -> float:
"""How much percentage of the time should configurations be intensified (evaluated on higher budgets or
more instances). This parameter is accessed in the SMBO class.
"""
return self._intensify_percentage
@property
def uses_seeds(self) -> bool: # noqa: D102
return True
@property
def uses_budgets(self) -> bool: # noqa: D102
return False
@property
def uses_instances(self) -> bool: # noqa: D102
if self._instances == [None]:
return False
return True
[docs] def get_target_function_seeds(self) -> list[int]: # noqa: D102
if self._deterministic:
return [0]
else:
return self._target_function_seeds
[docs] def get_target_function_budgets(self) -> list[float | None]: # noqa: D102
return [None]
[docs] def get_target_function_instances(self) -> list[str | None]: # noqa: D102
if self._instances == [None] or None in self._instances:
return [None]
instances: list[str | None] = []
for instance in self._instances:
if instance is not None:
instances.append(instance)
return instances
@property
def meta(self) -> dict[str, Any]: # noqa: D102
race_against: dict | None = None
if self._race_against is not None:
race_against = self._race_against.get_dictionary()
meta = super().meta
meta.update(
{
"race_against": race_against,
"intensify_percentage": self.intensify_percentage,
}
)
return meta
[docs] def get_next_trial(
self,
challengers: list[Configuration] | None,
incumbent: Configuration,
get_next_configurations: Callable[[], Iterator[Configuration]] | None,
runhistory: RunHistory,
repeat_configs: bool = True,
n_workers: int = 1,
) -> tuple[TrialInfoIntent, TrialInfo]:
"""This procedure is in charge of generating a TrialInfo object to comply with lines 7 (in
case stage is stage == RUN_INCUMBENT) or line 12 (In case of stage == RUN_CHALLENGER).
A TrialInfo object encapsulates the necessary information for a worker to execute the job, nevertheless, a
challenger is not always available. This could happen because no more configurations are available or the new
configuration to try was already executed.
To circumvent this, an intent is also returned:
* intent == RUN: Run the TrialInfo object (Normal Execution).
* intent == SKIP: Skip this iteration. No challenger is available. In particular because challenger is the same
as incumbent
Parameters
----------
challengers : list[Configuration] | None
Promising configurations.
incumbent : Configuration
Incumbent configuration.
get_next_configurations : Callable[[], Iterator[Configuration]] | None, defaults to none
Function that generates next configurations to use for racing.
runhistory : RunHistory
repeat_configs : bool, defaults to true
If false, an evaluated configuration will not be generated again.
n_workers : int, optional, defaults to 1
The maximum number of workers available.
Returns
-------
TrialInfoIntent
Indicator of how to consume the TrialInfo object.
TrialInfo
An object that encapsulates necessary information of the trial.
"""
if n_workers > 1:
raise ValueError("The selected intensifier does not support more than 1 worker.")
# If this function is called, it means the iteration is
# not complete (we can be starting a new iteration, or re-running a
# challenger due to line 17). We evaluate if a iteration is complete or not
# via _process_results
self._iteration_done = False
# In case a crash happens, and FirstRunCrashedException prevents a
# failure, revert back to running the incumbent
# Challenger case is by construction ok, as there is no special
# stage for its processing
if self._stage == IntensifierStage.PROCESS_FIRST_CONFIG_RUN:
self._stage = IntensifierStage.RUN_FIRST_CONFIG
elif self._stage == IntensifierStage.PROCESS_INCUMBENT_RUN:
self._stage = IntensifierStage.RUN_INCUMBENT
# if first ever run, then assume current challenger to be the incumbent
# Because this is the first ever run, we need to sample a new challenger
# This new challenger is also assumed to be the incumbent
if self._stage == IntensifierStage.RUN_FIRST_CONFIG:
if incumbent is None:
logger.info("No incumbent provided in the first run. Sampling a new challenger...")
challenger, self._new_challenger = self.get_next_challenger(
challengers=challengers,
get_next_configurations=get_next_configurations,
)
incumbent = challenger
else:
inc_trials = runhistory.get_trials(incumbent, only_max_observed_budget=True)
if len(inc_trials) > 0:
logger.debug("Skipping RUN_FIRST_CONFIG stage since incumbent has already been ran.")
self._stage = IntensifierStage.RUN_INCUMBENT
# LINES 3-7
if self._stage in [IntensifierStage.RUN_FIRST_CONFIG, IntensifierStage.RUN_INCUMBENT]:
# Line 3
# A modified version, that not only checks for max_config_calls
# but also makes sure that there are runnable instances, that is, instances has not been exhausted
inc_trials = runhistory.get_trials(incumbent, only_max_observed_budget=True)
# Line 4
pending_instances = self._get_pending_instances(incumbent, runhistory)
if pending_instances and len(inc_trials) < self._max_config_calls:
# Lines 5-7
instance, seed = self._get_next_instance(pending_instances)
return TrialInfoIntent.RUN, TrialInfo(
config=incumbent,
instance=instance,
seed=seed,
budget=None,
)
else:
# This point marks the transitions from lines 3-7 to 8-18
logger.debug("No further instance-seed pairs for incumbent available.")
self._stage = IntensifierStage.RUN_CHALLENGER
# Understand who is the active challenger.
if self._stage == IntensifierStage.RUN_BASIS:
# if in RUN_BASIS stage,
# return the basis configuration (i.e., `race_against`)
logger.debug("Race against default configuration after incumbent change.")
challenger = self._race_against
elif self._current_challenger and self._continue_challenger:
# if the current challenger could not be rejected,
# it is run again on more instances
challenger = self._current_challenger
else:
# Get a new challenger if all instance/pairs have been completed. Else return the currently running
# challenger
challenger, self._new_challenger = self.get_next_challenger(
challengers=challengers,
get_next_configurations=get_next_configurations,
)
# No new challengers are available for this iteration, move to the next iteration. This can only happen
# when all configurations for this iteration are exhausted and have been run in all proposed instance/pairs.
if challenger is None:
return TrialInfoIntent.SKIP, TrialInfo(
config=None,
instance=None,
seed=None,
budget=None,
)
# Skip the iteration if the challenger was previously run
if challenger == incumbent and self._stage == IntensifierStage.RUN_CHALLENGER:
self._challenger_same_as_incumbent = True
logger.debug("Challenger was the same as the current incumbent. Challenger is skipped.")
return TrialInfoIntent.SKIP, TrialInfo(
config=None,
instance=None,
seed=None,
budget=None,
)
logger.debug("Intensify on %s.", challenger.get_dictionary())
if hasattr(challenger, "origin"):
logger.debug("Configuration origin: %s", challenger.origin)
if self._stage in [IntensifierStage.RUN_CHALLENGER, IntensifierStage.RUN_BASIS]:
if not self._to_run:
self._to_run = self._get_missing_instances(
incumbent=incumbent, challenger=challenger, runhistory=runhistory, N=self._N
)
# If there is no more configs to run in this iteration, or no more
# time to do so, change the current stage base on how the current
# challenger performs as compared to the incumbent. This is done
# via _process_racer_results
if len(self._to_run) == 0:
# Nevertheless, if there are no more instances to run,
# we might need to comply with line 17 and keep running the
# same challenger. In this case, if there is not enough information
# to decide if the challenger is better/worst than the incumbent,
# line 17 doubles the number of instances to run.
logger.debug("No further trials for challenger possible.")
self._process_racer_results(
challenger=challenger,
incumbent=incumbent,
runhistory=runhistory,
)
# Request SMBO to skip this run. This function will
# be called again, after the _process_racer_results
# has updated the intensifier stage
return TrialInfoIntent.SKIP, TrialInfo(
config=None,
instance=None,
seed=None,
budget=None,
)
else:
# Lines 8-11
incumbent, instance, seed = self._get_next_racer(
challenger=challenger,
incumbent=incumbent,
runhistory=runhistory,
)
# Line 12
return TrialInfoIntent.RUN, TrialInfo(
config=challenger,
instance=instance,
seed=seed,
budget=None,
)
else:
raise ValueError("No valid stage found.")
[docs] def process_results(
self,
trial_info: TrialInfo,
trial_value: TrialValue,
incumbent: Configuration | None,
runhistory: RunHistory,
time_bound: float,
log_trajectory: bool = True,
) -> tuple[Configuration, float]:
"""The intensifier stage will be updated based on the results/status of a configuration execution.
During intensification, the following can happen:
* Challenger raced against incumbent.
* Also, during a challenger run, a capped exception can be triggered, where no racer post processing is needed.
* A run on the incumbent for more confidence needs to be processed, IntensifierStage.PROCESS_INCUMBENT_RUN.
* The first run results need to be processed (PROCESS_FIRST_CONFIG_RUN).
At the end of any run, checks are done to move to a new iteration.
Parameters
----------
trial_info : TrialInfo
trial_value: TrialValue
incumbent : Configuration | None
Best configuration seen so far.
runhistory : RunHistory
time_bound : float
Time [sec] available to perform intensify.
log_trajectory: bool
Whether to log changes of incumbents in the trajectory.
Returns
-------
incumbent: Configuration
Current (maybe new) incumbent configuration.
incumbent_costs: float | list[float]
Empirical cost(s) of the incumbent configuration.
"""
if self._stage == IntensifierStage.PROCESS_FIRST_CONFIG_RUN:
if incumbent is None:
logger.info("First run and no incumbent provided. Challenger is assumed to be the incumbent.")
incumbent = trial_info.config
if self._stage in [
IntensifierStage.PROCESS_INCUMBENT_RUN,
IntensifierStage.PROCESS_FIRST_CONFIG_RUN,
]:
self._num_trials += 1
self._process_incumbent(
incumbent=incumbent,
runhistory=runhistory,
log_trajectory=log_trajectory,
)
else:
self._num_trials += 1
self._num_challenger_run += 1
incumbent = self._process_racer_results(
challenger=trial_info.config,
incumbent=incumbent,
runhistory=runhistory,
log_trajectory=log_trajectory,
)
self._elapsed_time += trial_value.time
# check if 1 intensification run is complete - line 18
# this is different to regular SMAC as it requires at least successful challenger run,
# which is necessary to work on a fixed grid of configurations.
if (
self._stage == IntensifierStage.RUN_INCUMBENT
and self._challenger_id >= self._min_challenger
and self._num_challenger_run > 0
):
# if self._num_trials > self._run_limit:
# logger.debug("Maximum number of trials for intensification reached.")
# self._next_iteration()
if self._elapsed_time - time_bound >= 0:
logger.debug(f"Wallclock time limit for intensification reached ({self._elapsed_time}/{time_bound})")
self._next_iteration()
inc_perf = runhistory.get_cost(incumbent)
return incumbent, inc_perf
def _get_next_instance(
self,
pending_instances: list[str | None],
) -> tuple[str | None, int | None]:
"""Method to extract the next instance and seed instance in which a incumbent run most be evaluated."""
# Line 5 - and avoid https://github.com/numpy/numpy/issues/10791
_idx = self._rng.choice(len(pending_instances))
next_instance = pending_instances[_idx]
# Line 6
if self._deterministic:
next_seed = self.get_target_function_seeds()[0]
else:
self._last_seed_idx += 1
next_seed = self.get_target_function_seeds()[self._last_seed_idx]
# Line 7
logger.debug(f"Add run of incumbent for instance = {next_instance}")
if self._stage == IntensifierStage.RUN_FIRST_CONFIG:
self._stage = IntensifierStage.PROCESS_FIRST_CONFIG_RUN
else:
self._stage = IntensifierStage.PROCESS_INCUMBENT_RUN
return next_instance, next_seed
def _get_pending_instances(
self,
incumbent: Configuration,
runhistory: RunHistory,
) -> list[str | None]:
"""Implementation of line 4 of Intensification. This method queries the incumbent trials in the runhistory and
return the pending instances if any is available.
"""
# Line 4
# Find all instances that have the most trials on the inc
inc_trials = runhistory.get_trials(incumbent, only_max_observed_budget=True)
# Now we select and count the instances and sort them
inc_inst = [s.instance for s in inc_trials]
inc_inst_counter = list(Counter(inc_inst).items())
inc_inst_counter.sort(key=lambda x: x[1], reverse=True)
try:
max_trials = inc_inst_counter[0][1]
except IndexError:
logger.debug("No run for incumbent found.")
max_trials = 0
# Those are the instances with run the most
inc_inst = [x[0] for x in inc_inst_counter if x[1] == max_trials]
# We basically sort the instances by their name
available_insts: list[str | None] = list(set(self._instances) - set(inc_inst))
if None in available_insts:
assert len(available_insts) == 1
available_insts = [None]
else:
available_insts = sorted(available_insts) # type: ignore
# If all instances were used n times, we can pick an instances from the complete set again
if not self._deterministic and not available_insts:
available_insts = self._instances
return available_insts
def _process_incumbent(
self,
incumbent: Configuration,
runhistory: RunHistory,
log_trajectory: bool = True,
) -> None:
"""Method to process the results of a challenger that races an incumbent."""
# output estimated performance of incumbent
inc_trials = runhistory.get_trials(incumbent, only_max_observed_budget=True)
inc_perf = runhistory.get_cost(incumbent)
format_value = format_array(inc_perf)
logger.info(f"Updated estimated cost of incumbent on {len(inc_trials)} trials: {format_value}")
# if running first configuration, go to next stage after 1st run
if self._stage in [
IntensifierStage.RUN_FIRST_CONFIG,
IntensifierStage.PROCESS_FIRST_CONFIG_RUN,
]:
self._stage = IntensifierStage.RUN_INCUMBENT
self._next_iteration()
else:
# Termination condition; after each run, this checks
# whether further trials are necessary due to min_config_calls
if len(inc_trials) >= self._min_config_calls or len(inc_trials) >= self._max_config_calls:
self._stage = IntensifierStage.RUN_CHALLENGER
else:
self._stage = IntensifierStage.RUN_INCUMBENT
self._compare_configs(
incumbent=incumbent, challenger=incumbent, runhistory=runhistory, log_trajectory=log_trajectory
)
def _get_next_racer(
self,
challenger: Configuration,
incumbent: Configuration,
runhistory: RunHistory,
log_trajectory: bool = True,
) -> tuple[Configuration, str | None, int | None]:
"""Method to return the next config setting to aggressively race challenger against
incumbent. Returns either the challenger or incumbent as new incumbent alongside with the
instance and seed.
"""
# By the time this function is called, the run history might
# have shifted. Re-populate the list if necessary
if not self._to_run:
# Lines 10/11
self._to_run = self._get_missing_instances(
incumbent=incumbent, challenger=challenger, runhistory=runhistory, N=self._N
)
# Run challenger on all <instance, seed> to run
instance_seed_budget_key = self._to_run.pop()
instance = instance_seed_budget_key.instance
seed = instance_seed_budget_key.seed
# Line 12
return incumbent, instance, seed
def _process_racer_results(
self,
challenger: Configuration,
incumbent: Configuration,
runhistory: RunHistory,
log_trajectory: bool = True,
) -> Configuration | None:
"""Process the result of a racing configuration against the current incumbent. Might propose
a new incumbent.
"""
chal_trials = runhistory.get_trials(challenger, only_max_observed_budget=True)
chal_perf = runhistory.get_cost(challenger)
# If all <instance, seed> have been run, compare challenger performance
if not self._to_run:
new_incumbent = self._compare_configs(
incumbent=incumbent,
challenger=challenger,
runhistory=runhistory,
log_trajectory=log_trajectory,
)
# Update intensification stage
if new_incumbent == incumbent:
# move on to the next iteration
self._stage = IntensifierStage.RUN_INCUMBENT
self._continue_challenger = False
logger.debug(
"Estimated cost of challenger on %d trials: %.4f, but worse than incumbent.",
len(chal_trials),
chal_perf,
)
elif new_incumbent == challenger:
# New incumbent found
incumbent = challenger
self._continue_challenger = False
# compare against basis configuration if provided, else go to next iteration
if self._race_against and self._race_against != challenger:
self._stage = IntensifierStage.RUN_BASIS
else:
self._stage = IntensifierStage.RUN_INCUMBENT
logger.debug(
"Estimated cost of challenger on %d trials (%.4f) becomes new incumbent.",
len(chal_trials),
chal_perf,
)
else: # Line 17
# Challenger is not worse, continue
self._N = 2 * self._N
self._continue_challenger = True
logger.debug(
"Estimated cost of challenger on %d trials (%.4f). Adding %d trials to the queue.",
len(chal_trials),
chal_perf,
self._N / 2,
)
else:
logger.debug(
"Estimated cost of challenger on %d trials (%.4f). Still %d trials to go (continue racing).",
len(chal_trials),
chal_perf,
len(self._to_run),
)
return incumbent
def _get_missing_instances(
self,
challenger: Configuration,
incumbent: Configuration,
N: int,
runhistory: RunHistory,
) -> list[InstanceSeedBudgetKey]:
"""Returns the minimum list of <instance, seed> pairs to run the challenger on before
comparing it with the incumbent.
"""
# Get next instances left for the challenger
# Line 8
inc_inst_seeds = set(runhistory.get_trials(incumbent, only_max_observed_budget=True))
chall_inst_seeds = set(runhistory.get_trials(challenger, only_max_observed_budget=True))
# Line 10
missing_trials = list(sorted(inc_inst_seeds - chall_inst_seeds))
# Line 11
self._rng.shuffle(missing_trials) # type: ignore
if N < 0:
raise ValueError("Argument N must not be smaller than zero, but is %s." % str(N))
to_run = missing_trials[: min(N, len(missing_trials))]
missing_trials = missing_trials[min(N, len(missing_trials)) :]
return to_run
[docs] def get_next_challenger(
self,
challengers: list[Configuration] | None,
get_next_configurations: Callable[[], Iterator[Configuration]] | None,
) -> tuple[Configuration | None, bool]:
"""This function returns the next challenger, that should be exercised though lines 8-17.
It does so by populating `_configs_to_run`, which is a pool of configuration from which the racer will sample.
Each configuration within `_configs_to_run`, will be intensified on different instances/seed registered in
`self._to_run` as stated in line 11.
A brand new configuration should only be sampled, after all `self._to_run` instance seed pairs are exhausted.
This method triggers a call to `_next_iteration` if there are no more configurations to run, for the current
intensification loop. This marks the transition to Line 2, where a new configuration to intensify will be drawn
from epm/initial challengers.
Parameters
----------
challengers : list[Configuration] | None, defaults to None
Promising configurations.
get_next_configurations : Callable[[], Iterator[Configuration]] | None
Function that generates next configurations to use for racing.
Returns
-------
configuration : Configuration | None
The configuration of the selected challenger.
new_challenger : bool
If the configuration is a new challenger.
"""
# Select new configuration when entering 'race challenger' stage or for the first run
if not self._current_challenger or (self._stage == IntensifierStage.RUN_CHALLENGER and not self._to_run):
# This is a new intensification run, get the next list of configurations to run
if self._update_configs_to_run:
configs_to_run = self._generate_challengers(
challengers=challengers, get_next_configurations=get_next_configurations
)
self._configs_to_run = cast(Iterator[Optional[Configuration]], configs_to_run)
self._update_configs_to_run = False
# Pick next configuration from the generator
try:
challenger = next(self._configs_to_run)
except StopIteration:
# Out of challengers for the current iteration, start next incumbent iteration
self._next_iteration()
return None, False
if challenger:
# Reset instance index for the new challenger
self._challenger_id += 1
self._current_challenger = challenger
self._N = max(1, self._min_config_calls)
self._to_run = []
return challenger, True
# Return currently running challenger
return self._current_challenger, False
def _generate_challengers(
self,
challengers: list[Configuration] | None,
get_next_configurations: Callable[[], Iterator[Configuration]] | None,
) -> Iterator[Optional[Configuration]]:
"""Retuns a sequence of challengers to use in intensification. If challengers are not
provided, then optimizer will be used to generate the challenger list.
"""
chall_gen: Iterator[Optional[Configuration]]
if challengers:
# Iterate over challengers provided
logger.debug("Using provided challengers...")
chall_gen = iter(challengers)
elif get_next_configurations:
# Generating challengers on-the-fly if optimizer is given
logger.debug("Generating new challenger from optimizer...")
chall_gen = get_next_configurations()
else:
raise ValueError("No configuration function provided. Can not generate challenger!")
return chall_gen
def _next_iteration(self) -> None:
"""Updates tracking variables at the end of an intensification run."""
assert self._stats
# Track iterations
self._n_iters += 1
self._iteration_done = True
self._configs_to_run = iter([])
self._update_configs_to_run = True
# Reset for a new iteration
self._num_trials = 0
self._num_challenger_run = 0
self._challenger_id = 0
self._elapsed_time = 0.0
self._stats.update_average_configs_per_intensify(n_configs=self._challenger_id)