from __future__ import annotations
from typing import Any, Iterator
from ConfigSpace import Configuration
from smac.intensifier.abstract_intensifier import AbstractIntensifier
from smac.runhistory import TrialInfo
from smac.runhistory.dataclasses import InstanceSeedBudgetKey
from smac.scenario import Scenario
from smac.utils.configspace import get_config_hash
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]class Intensifier(AbstractIntensifier):
"""Implementation of an intensifier supporting multi-fidelity, multi-objective, and multi-processing.
Races challengers against current incumbents.
The behaviour of this intensifier is as follows:
- First, adds configs from the runhistory to the queue with N=1 (they will be ignored if they are already
evaluated).
- While loop:
- If queue is empty: Intensifies exactly one more instance of one incumbent and samples a new configuration
afterwards.
- If queue is not empty: Configs in the queue are evaluated on N=(N*2) instances if they might be better
than the incumbents. If not, they are removed from the queue and rejected forever.
Parameters
----------
max_config_calls : int, defaults to 3
Maximum number of configuration evaluations. Basically, how many instance-seed keys should be maxed evaluated
for a configuration.
max_incumbents : int, defaults to 10
How many incumbents to keep track of in the case of multi-objective.
retries : int, defaults to 16
How many more iterations should be done in case no new trial is found.
seed : int, defaults to None
Internal seed used for random events, like shuffle seeds.
"""
def __init__(
self,
scenario: Scenario,
max_config_calls: int = 3,
max_incumbents: int = 10,
retries: int = 16,
seed: int | None = None,
):
super().__init__(scenario=scenario, max_config_calls=max_config_calls, max_incumbents=max_incumbents, seed=seed)
self._retries = retries
[docs] def reset(self) -> None:
"""Resets the internal variables of the intensifier including the queue."""
super().reset()
# Queue to keep track of the challengers
# (config, N=how many trials should be sampled)
self._queue: list[tuple[Configuration, int]] = []
@property
def uses_seeds(self) -> bool: # noqa: D102
return True
@property
def uses_budgets(self) -> bool: # noqa: D102
return False
@property
def uses_instances(self) -> bool: # noqa: D102
if self._scenario.instances is None:
return False
return True
[docs] def get_state(self) -> dict[str, Any]: # noqa: D102
return {
"queue": [(self.runhistory.get_config_id(config), n) for config, n in self._queue],
}
[docs] def set_state(self, state: dict[str, Any]) -> None: # noqa: D102
self._queue = [(self.runhistory.get_config(id), n) for id, n in state["queue"]]
[docs] def __iter__(self) -> Iterator[TrialInfo]:
"""This iter method holds the logic for the intensification loop.
Some facts about the loop:
- Adds existing configurations from the runhistory to the queue (that means it supports user-inputs).
- Everytime an incumbent (with the lowest amount of trials) is intensified, a new challenger is added to the
queue.
- If all incumbents are evaluated on the same trials, a new trial is added to one of the incumbents.
- Only challengers which are not rejected/running/incumbent are intensified by N*2.
Returns
-------
trials : Iterator[TrialInfo]
Iterator over the trials.
"""
self.__post_init__()
rh = self.runhistory
assert self._max_config_calls is not None
# What if there are already trials in the runhistory? Should we queue them up?
# Because they are part of the runhistory, they might be selected as incumbents. However, they are not
# intensified because they are not part of the queue. We could add them here to incorporate them in the
# intensification process.
# Idea: Add all configs to queue (if it is an incumbent it is removed automatically later on)
# N=1 is enough here as it will increase automatically in the iterations if the configuration is worthy
# Note: The incumbents are updated once the runhistory is set (see abstract intensifier)
# Note 2: If the queue was restored, we don't want to go in here (queue is restored)
if len(self._queue) == 0:
for config in rh.get_configs():
hash = get_config_hash(config)
self._queue.append((config, 1))
logger.info(f"Added config {hash} from runhistory to the intensifier queue.")
fails = -1
while True:
fails += 1
# Some criteria to stop the intensification if nothing can be intensified anymore
if fails > self._retries:
logger.error("Intensifier could not find any new trials.")
return
# Some configs from the runhistory
running_configs = rh.get_running_configs()
rejected_configs = self.get_rejected_configs()
# Now we get the incumbents sorted by number of trials
# Also, incorporate ``get_incumbent_instance_seed_budget_keys`` here because challengers are only allowed to
# sample from the incumbent's instances
incumbents = self.get_incumbents(sort_by="num_trials")
incumbent_isb_keys = self.get_incumbent_instance_seed_budget_keys()
# Check if configs in queue are still running
all_configs_running = True
for config, _ in self._queue:
if config not in running_configs:
all_configs_running = False
break
if len(self._queue) == 0 or all_configs_running:
if len(self._queue) == 0:
logger.debug("Queue is empty:")
else:
logger.debug("All configs in the queue are running:")
if len(incumbents) == 0:
logger.debug("--- No incumbent to intensify.")
for incumbent in incumbents:
# Instances of this particular incumbent
individual_incumbent_isb_keys = rh.get_instance_seed_budget_keys(incumbent)
incumbent_hash = get_config_hash(incumbent)
# We don't want to intensify an incumbent which is either still running or rejected
if incumbent in running_configs:
logger.debug(
f"--- Skipping intensifying incumbent {incumbent_hash} because it has trials pending."
)
continue
if incumbent in rejected_configs:
# This should actually not happen because if a config is rejected the incumbent should
# have changed
# However, we just keep it here as sanity check
logger.debug(f"--- Skipping intensifying incumbent {incumbent_hash} because it was rejected.")
continue
# If incumbent was evaluated on all incumbent instance intersections but was not evaluated on
# the differences, we have to add it here
incumbent_isb_key_differences = self.get_incumbent_instance_seed_budget_key_differences()
# We set shuffle to false because we first want to evaluate the incumbent instances, then the
# differences (to make the instance-seed keys for the incumbents equal again)
trials = self._get_next_trials(
incumbent,
from_keys=incumbent_isb_keys + incumbent_isb_key_differences,
shuffle=False,
)
# If we don't receive any trials, then we try it randomly with any other because we want to
# intensify for sure
if len(trials) == 0:
logger.debug(
f"--- Incumbent {incumbent_hash} was already evaluated on all incumbent instances "
"and incumbent instance differences so far. Looking for new instances..."
)
trials = self._get_next_trials(incumbent)
logger.debug(f"--- Randomly found {len(trials)} new trials.")
if len(trials) > 0:
fails = -1
logger.debug(
f"--- Yielding trial {len(individual_incumbent_isb_keys)+1} of "
f"{self._max_config_calls} from incumbent {incumbent_hash}..."
)
yield trials[0]
logger.debug(f"--- Finished yielding for config {incumbent_hash}.")
# We break here because we only want to intensify one more trial of one incumbent
break
else:
# assert len(incumbent_isb_keys) == self._max_config_calls
logger.debug(
f"--- Skipped intensifying incumbent {incumbent_hash} because no new trials have "
"been found. Evaluated "
f"{len(individual_incumbent_isb_keys)}/{self._max_config_calls} trials."
)
# For each intensification of the incumbent, we also want to intensify the next configuration
# We simply add it to the queue and intensify it in the next iteration
try:
config = next(self.config_generator)
config_hash = get_config_hash(config)
self._queue.append((config, 1))
logger.debug(f"--- Added a new config {config_hash} to the queue.")
# If we added a new config, then we did something in this iteration
fails = -1
except StopIteration:
# We stop if we don't find any configuration anymore
return
else:
logger.debug("Start finding a new challenger in the queue:")
for i, (config, N) in enumerate(self._queue.copy()):
config_hash = get_config_hash(config)
# If the config is still running, we ignore it and head to the next config
if config in running_configs:
logger.debug(f"--- Config {config_hash} is still running. Skipping this config in the queue...")
continue
# We want to get rid of configs in the queue which are rejected
if config in rejected_configs:
logger.debug(f"--- Config {config_hash} was removed from the queue because it was rejected.")
self._queue.remove((config, N))
continue
# We don't want to intensify an incumbent here
if config in incumbents:
logger.debug(f"--- Config {config_hash} was removed from the queue because it is an incumbent.")
self._queue.remove((config, N))
continue
# And then we yield as many trials as we specified N
# However, only the same instances as the incumbents are used
isk_keys: list[InstanceSeedBudgetKey] | None = None
if len(incumbent_isb_keys) > 0:
isk_keys = incumbent_isb_keys
# TODO: What to do if there are no incumbent instances? (Use-case: call multiple asks)
trials = self._get_next_trials(config, N=N, from_keys=isk_keys)
logger.debug(f"--- Yielding {len(trials)} trials to evaluate config {config_hash}...")
for trial in trials:
fails = -1
yield trial
logger.debug(f"--- Finished yielding for config {config_hash}.")
# Now we have to remove the config
self._queue.remove((config, N))
logger.debug(f"--- Removed config {config_hash} with N={N} from queue.")
# Finally, we add the same config to the queue with a higher N
# If the config was rejected by the runhistory, then it's been removed in the next iteration
if N < self._max_config_calls:
new_pair = (config, N * 2)
if new_pair not in self._queue:
logger.debug(
f"--- Doubled trials of config {config_hash} to N={N*2} and added it to the queue "
"again."
)
self._queue.append((config, N * 2))
# Also reset fails here
fails = -1
else:
logger.debug(f"--- Config {config_hash} with N={N*2} is already in the queue.")
# If we are at this point, it really is important to break because otherwise, we would intensify
# all configs in the queue in one iteration
break
def _get_next_trials(
self,
config: Configuration,
*,
N: int | None = None,
from_keys: list[InstanceSeedBudgetKey] | None = None,
shuffle: bool = True,
) -> list[TrialInfo]:
"""Returns the next trials of the configuration based on ``get_trials_of_interest``. If N is specified,
maximum N trials are returned but not necessarily all of them (depending on evaluated already or still running).
Parameters
----------
N : int | None, defaults to None
The maximum number of trials to return. If None, all trials (``max_config_calls``) are returned.
Running and evaluated trials are counted in.
from_keys : list[InstanceSeedBudgetKey], defaults to None
Only instances from the list are considered for the trials.
shuffle : bool, defaults to True
Shuffles the trials in groups. First, all instances are shuffled, then all seeds.
"""
rh = self.runhistory
is_keys = self.get_instance_seed_keys_of_interest()
# Create trials from the instance seed pairs
# trials: list[TrialInfo] = []
# for is_key in is_keys:
# trials.append(TrialInfo(config=config, instance=is_key.instance, seed=is_key.seed))
# Keep ``from_keys`` trials only
if from_keys is not None:
valid_is_keys = [key.get_instance_seed_key() for key in from_keys]
for is_key in is_keys.copy():
if is_key not in valid_is_keys:
is_keys.remove(is_key)
# Counter is important to actually subtract the number of trials that are already evaluated/running
# Otherwise, evaluated/running trials are not considered
# Example: max_config_calls=16, N=8, 2 trials are running, 2 trials are evaluated, 4 trials are pending
# Without a counter, we would return 8 trials because there are still so many trials left open
# With counter, we would return only 4 trials because 4 trials are already evaluated/running
counter = 0
# Now we actually have to check whether the trials have been evaluated already
evaluated_isb_keys = rh.get_instance_seed_budget_keys(config, highest_observed_budget_only=False)
for isb_key in evaluated_isb_keys:
is_key = isb_key.get_instance_seed_key()
if is_key in is_keys:
counter += 1
is_keys.remove(is_key)
# It's also important to remove running trials from the selection (we don't want to queue them again)
running_trials = rh.get_running_trials(config)
for trial in running_trials:
is_key = trial.get_instance_seed_key()
if is_key in is_keys:
counter += 1
is_keys.remove(is_key)
if shuffle:
is_keys = self._reorder_instance_seed_keys(is_keys)
# Return only N trials
if N is not None:
N = N - counter
if len(is_keys) > N:
is_keys = is_keys[:N]
# Now we convert to trials
trials: list[TrialInfo] = []
for is_key in is_keys:
trials.append(TrialInfo(config=config, instance=is_key.instance, seed=is_key.seed))
return trials