Source code for smac.intensifier.abstract_intensifier

from __future__ import annotations

from abc import abstractmethod
from typing import Any, Callable, Iterator

import dataclasses
import json
from collections import defaultdict
from pathlib import Path

import numpy as np
from ConfigSpace import Configuration

import smac
from smac.callback.callback import Callback
from smac.constants import MAXINT
from smac.main.config_selector import ConfigSelector
from smac.runhistory import TrialInfo
from smac.runhistory.dataclasses import (
    InstanceSeedBudgetKey,
    InstanceSeedKey,
    TrajectoryItem,
    TrialValue,
)
from smac.runhistory.runhistory import RunHistory
from smac.scenario import Scenario
from smac.utils.configspace import get_config_hash, print_config_changes
from smac.utils.logging import get_logger
from smac.utils.pareto_front import calculate_pareto_front, sort_by_crowding_distance

__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"

logger = get_logger(__name__)


[docs]class AbstractIntensifier: """Abstract implementation of an intensifier supporting multi-fidelity, multi-objective, and multi-threading. The abstract intensifier keeps track of the incumbent, which is updated everytime the runhistory changes. Parameters ---------- n_seeds : int | None, defaults to None How many seeds to use for each instance. It is used in the abstract intensifier to determine validation trials. max_config_calls : int, defaults to None Maximum number of configuration evaluations. Basically, how many instance-seed keys should be max evaluated for a configuration. It is used in the abstract intensifier to determine validation trials. max_incumbents : int, defaults to 10 How many incumbents to keep track of in the case of multi-objective. seed : int, defaults to None Internal seed used for random events like shuffle seeds. """ def __init__( self, scenario: Scenario, n_seeds: int | None = None, max_config_calls: int | None = None, max_incumbents: int = 10, seed: int | None = None, ): self._scenario = scenario self._config_selector: ConfigSelector | None = None self._config_generator: Iterator[ConfigSelector] | None = None self._runhistory: RunHistory | None = None if seed is None: seed = self._scenario.seed self._seed = seed self._rng = np.random.RandomState(seed) # Internal variables self._n_seeds = n_seeds self._max_config_calls = max_config_calls self._max_incumbents = max_incumbents self._used_walltime_func: Callable | None = None # Reset everything self.reset()
[docs] def reset(self) -> None: """Reset the internal variables of the intensifier.""" self._tf_seeds: list[int] = [] self._tf_instances: list[str | None] = [] self._tf_budgets: list[float | None] = [] self._instance_seed_keys: list[InstanceSeedKey] | None = None self._instance_seed_keys_validation: list[InstanceSeedKey] | None = None # Incumbent variables self._incumbents: list[Configuration] = [] self._incumbents_changed = 0 self._rejected_config_ids: list[int] = [] self._trajectory: list[TrajectoryItem] = []
@property def meta(self) -> dict[str, Any]: """Returns the meta data of the created object.""" return { "name": self.__class__.__name__, "max_incumbents": self._max_incumbents, "seed": self._seed, } @property def trajectory(self) -> list[TrajectoryItem]: """Returns the trajectory (changes of incumbents) of the optimization run.""" return self._trajectory @property def runhistory(self) -> RunHistory: """Runhistory of the intensifier.""" assert self._runhistory is not None return self._runhistory @runhistory.setter def runhistory(self, runhistory: RunHistory) -> None: """Sets the runhistory.""" self._runhistory = runhistory @property def used_walltime(self) -> float: """Returns used wallclock time.""" if self._used_walltime_func is None: return 0.0 return self._used_walltime_func() @used_walltime.setter def used_walltime(self, func: Callable) -> None: """Sets the used wallclock time.""" self._used_walltime_func = func
[docs] def __post_init__(self) -> None: """Fills ``self._tf_seeds`` and ``self._tf_instances``. Moreover, the incumbents are updated.""" rh = self.runhistory # Validate runhistory: Are seeds/instances/budgets used? # Add seed/instance/budget to the cache for k in rh.keys(): if self.uses_seeds: if k.seed is None: raise ValueError("Trial contains no seed information but intensifier expects seeds to be used.") if k.seed not in self._tf_seeds: self._tf_seeds.append(k.seed) if self.uses_instances: if self._scenario.instances is None and k.instance is not None: raise ValueError( "Scenario does not specify any instances but found instance information in runhistory." ) if self._scenario.instances is not None and k.instance not in self._scenario.instances: raise ValueError( "Instance information in runhistory is not part of the defined instances in scenario." ) if k.instance not in self._tf_instances: self._tf_instances.append(k.instance) if self.uses_budgets: if k.budget is None: raise ValueError("Trial contains no budget information but intensifier expects budgets to be used.") if k.budget not in self._tf_budgets: self._tf_budgets.append(k.budget) # Add all other instances to ``_tf_instances`` # Behind idea: Prioritize instances that are found in the runhistory if (instances := self._scenario.instances) is not None: for inst in instances: if inst not in self._tf_instances: self._tf_instances.append(inst) if len(self._tf_instances) == 0: self._tf_instances = [None] if len(self._tf_budgets) == 0: self._tf_budgets = [None] # Update our incumbents here for config in rh.get_configs(): self.update_incumbents(config)
@property def config_generator(self) -> Iterator[Configuration]: """Based on the configuration selector, an iterator is returned that generates configurations.""" assert self._config_generator is not None return self._config_generator @property def config_selector(self) -> ConfigSelector: """The configuration selector for the intensifier.""" assert self._config_selector is not None return self._config_selector @config_selector.setter def config_selector(self, config_selector: ConfigSelector) -> None: # Set it global self._config_selector = config_selector self._config_generator = iter(config_selector) @property @abstractmethod def uses_seeds(self) -> bool: """If the intensifier needs to make use of seeds.""" raise NotImplementedError @property @abstractmethod def uses_budgets(self) -> bool: """If the intensifier needs to make use of budgets.""" raise NotImplementedError @property @abstractmethod def uses_instances(self) -> bool: """If the intensifier needs to make use of instances.""" raise NotImplementedError @property def incumbents_changed(self) -> int: """How often the incumbents have changed.""" return self._incumbents_changed
[docs] def get_instance_seed_keys_of_interest( self, *, validate: bool = False, seed: int | None = None, ) -> list[InstanceSeedKey]: """Returns a list of instance-seed keys. Considers seeds and instances from the runhistory (``self._tf_seeds`` and ``self._tf_instances``). If no seeds or instances were found, new seeds and instances are generated based on the global intensifier seed. Warning ------- The passed seed is only used for validation. For training, the global intensifier seed is used. Parameters ---------- validate : bool, defaults to False Whether to get validation trials or training trials. The only difference lies in different seeds. seed : int | None, defaults to None The seed used for the validation trials. Returns ------- instance_seed_keys : list[InstanceSeedKey] Instance-seed keys of interest. """ if self._runhistory is None: raise RuntimeError("Please set the runhistory before calling this method.") if len(self._tf_instances) == 0: raise RuntimeError("Please call __post_init__ before calling this method.") if seed is None: seed = 0 # We cache the instance-seed keys for efficiency and consistency reasons if (self._instance_seed_keys is None and not validate) or ( self._instance_seed_keys_validation is None and validate ): instance_seed_keys: list[InstanceSeedKey] = [] if validate: rng = np.random.RandomState(seed) else: rng = self._rng i = 0 while True: found_enough_configs = ( self._max_config_calls is not None and len(instance_seed_keys) >= self._max_config_calls ) used_enough_seeds = self._n_seeds is not None and i >= self._n_seeds if found_enough_configs or used_enough_seeds: break if validate: next_seed = int(rng.randint(low=0, high=MAXINT, size=1)[0]) else: try: next_seed = self._tf_seeds[i] logger.info(f"Added existing seed {next_seed} from runhistory to the intensifier.") except IndexError: # Use global random generator for a new seed and mark it so it will be reused for another config next_seed = int(rng.randint(low=0, high=MAXINT, size=1)[0]) # This line here is really important because we don't want to add the same seed twice if next_seed in self._tf_seeds: continue self._tf_seeds.append(next_seed) logger.debug(f"Added a new random seed {next_seed} to the intensifier.") # If no instances are used, tf_instances includes None for instance in self._tf_instances: instance_seed_keys.append(InstanceSeedKey(instance, next_seed)) # Only use one seed in deterministic case if self._scenario.deterministic: logger.info("Using only one seed for deterministic scenario.") break # Seed counter i += 1 # Now we cut so that we only have max_config_calls instance_seed_keys # We favor instances over seeds here: That makes sure we always work with the same instance/seed pairs if self._max_config_calls is not None: if len(instance_seed_keys) > self._max_config_calls: instance_seed_keys = instance_seed_keys[: self._max_config_calls] logger.info(f"Cut instance-seed keys to {self._max_config_calls} entries.") # Set it globally if not validate: self._instance_seed_keys = instance_seed_keys else: self._instance_seed_keys_validation = instance_seed_keys if not validate: assert self._instance_seed_keys is not None instance_seed_keys = self._instance_seed_keys else: assert self._instance_seed_keys_validation is not None instance_seed_keys = self._instance_seed_keys_validation return instance_seed_keys.copy()
[docs] def get_trials_of_interest( self, config: Configuration, *, validate: bool = False, seed: int | None = None, ) -> list[TrialInfo]: """Returns the trials of interest for a given configuration. Expands the keys from ``get_instance_seed_keys_of_interest`` with the config. """ is_keys = self.get_instance_seed_keys_of_interest(validate=validate, seed=seed) trials = [] for key in is_keys: trials.append(TrialInfo(config=config, instance=key.instance, seed=key.seed)) return trials
[docs] def get_incumbent(self) -> Configuration | None: """Returns the current incumbent in a single-objective setting.""" if self._scenario.count_objectives() > 1: raise ValueError("Cannot get a single incumbent for multi-objective optimization.") if len(self._incumbents) == 0: return None assert len(self._incumbents) == 1 return self._incumbents[0]
[docs] def get_incumbents(self, sort_by: str | None = None) -> list[Configuration]: """Returns the incumbents (points on the pareto front) of the runhistory as copy. In case of a single-objective optimization, only one incumbent (if is) is returned. Returns ------- configs : list[Configuration] The configs of the Pareto front. sort_by : str, defaults to None Sort the trials by ``cost`` (lowest cost first) or ``num_trials`` (config with lowest number of trials first). """ rh = self.runhistory if sort_by == "cost": return list(sorted(self._incumbents, key=lambda config: rh._cost_per_config[rh.get_config_id(config)])) elif sort_by == "num_trials": return list(sorted(self._incumbents, key=lambda config: len(rh.get_trials(config)))) elif sort_by is None: return list(self._incumbents) else: raise ValueError(f"Unknown sort_by value: {sort_by}.")
[docs] def get_instance_seed_budget_keys( self, config: Configuration, compare: bool = False ) -> list[InstanceSeedBudgetKey]: """Returns the instance-seed-budget keys for a given configuration. This method is *used for updating the incumbents* and might differ for different intensifiers. For example, if incumbents should only be compared on the highest observed budgets. """ return self.runhistory.get_instance_seed_budget_keys(config, highest_observed_budget_only=False)
[docs] def get_incumbent_instance_seed_budget_keys(self, compare: bool = False) -> list[InstanceSeedBudgetKey]: """Find the lowest intersection of instance-seed-budget keys for all incumbents.""" incumbents = self.get_incumbents() if len(incumbents) > 0: # We want to calculate the smallest set of trials that is used by all incumbents # Reason: We can not fairly compare otherwise incumbent_isb_keys = [self.get_instance_seed_budget_keys(incumbent, compare) for incumbent in incumbents] instances = list(set.intersection(*map(set, incumbent_isb_keys))) # type: ignore return instances # type: ignore return []
[docs] def get_incumbent_instance_seed_budget_key_differences(self, compare: bool = False) -> list[InstanceSeedBudgetKey]: """There are situations in which incumbents are evaluated on more trials than others. This method returns the instances that are not part of the lowest intersection of instances for all incumbents. """ incumbents = self.get_incumbents() if len(incumbents) > 0: # We want to calculate the differences so that we can evaluate the other incumbents on the same instances incumbent_isb_keys = [self.get_instance_seed_budget_keys(incumbent, compare) for incumbent in incumbents] if len(incumbent_isb_keys) <= 1: return [] # Compute the actual differences intersection_isb_keys = set.intersection(*map(set, incumbent_isb_keys)) # type: ignore union_isb_keys = set.union(*map(set, incumbent_isb_keys)) # type: ignore incumbent_isb_keys = list(union_isb_keys - intersection_isb_keys) # type: ignore if len(incumbent_isb_keys) == 0: return [] return incumbent_isb_keys # type: ignore return []
[docs] def get_rejected_configs(self) -> list[Configuration]: """Returns rejected configurations when racing against the incumbent failed.""" configs = [] for rejected_config_id in self._rejected_config_ids: configs.append(self.runhistory._ids_config[rejected_config_id]) return configs
[docs] def get_callback(self) -> Callback: """The intensifier makes use of a callback to efficiently update the incumbent based on the runhistory (every time new information is available). Moreover, incorporating the callback here allows developers more options in the future. """ class RunHistoryCallback(Callback): def __init__(self, intensifier: AbstractIntensifier): self.intensifier = intensifier def on_tell_end(self, smbo: smac.main.smbo.SMBO, info: TrialInfo, value: TrialValue) -> None: self.intensifier.update_incumbents(info.config) return RunHistoryCallback(self)
[docs] def update_incumbents(self, config: Configuration) -> None: """Updates the incumbents. This method is called everytime a trial is added to the runhistory. Since only the affected config and the current incumbents are used, this method is very efficient. Furthermore, a configuration is only considered incumbent if it has a better performance on all incumbent instances. Crucially, if there is no incumbent (at the start) then, the first configuration assumes incumbent status. For the next configuration, we need to check if the configuration is better on all instances that have been evaluated for the incumbent. If this is the case, then we can replace the incumbent. Otherwise, a) we need to requeue the config to obtain the missing instance-seed-budget combination or b) mark this configuration as inferior ("rejected") to not consider it again. The comparison behaviour is controlled by self.get_instance_seed_budget_keys() and self.get_incumbent_instance_seed_budget_keys(). Notably, this method is written to support both multi-fidelity and multi-objective optimization. While the get_instance_seed_budget_keys() method and self.get_incumbent_instance_seed_budget_keys() are used for the multi-fidelity behaviour, calculate_pareto_front() is used as a hard coded way to support multi-objective optimization, including the single objective as special case. calculate_pareto_front() is called on the set of all (in case of MO) incumbents amended with the challenger configuration, provided it has a sufficient overlap in seed-instance-budget combinations. Lastly, if we have a self._max_incumbents and the pareto front provides more than this specified amount, we cut the incumbents using crowding distance. """ rh = self.runhistory # What happens if a config was rejected, but it appears again? Give it another try even if it # has already been evaluated? Yes! # Associated trials and id config_isb_keys = self.get_instance_seed_budget_keys(config) config_id = rh.get_config_id(config) config_hash = get_config_hash(config) # We skip updating incumbents if no instances are available # Note: This is especially the case if trials of a config are still running # because if trials are running, the runhistory does not update the trials in the fast data structure if len(config_isb_keys) == 0: logger.debug(f"No relevant instances evaluated for config {config_hash}. Updating incumbents is skipped.") return # Now we get the incumbents and see which trials have been used incumbents = self.get_incumbents() incumbent_ids = [rh.get_config_id(c) for c in incumbents] # Find the lowest intersection of instance-seed-budget keys for all incumbents. incumbent_isb_keys = self.get_incumbent_instance_seed_budget_keys() # Save for later previous_incumbents = incumbents.copy() previous_incumbent_ids = incumbent_ids.copy() # Little sanity check here for consistency if len(incumbents) > 0: assert incumbent_isb_keys is not None assert len(incumbent_isb_keys) > 0 # If there are no incumbents at all, we just use the new config as new incumbent # Problem: We can add running incumbents if len(incumbents) == 0: # incumbent_isb_keys is None and len(incumbents) == 0: logger.info(f"Added config {config_hash} as new incumbent because there are no incumbents yet.") self._update_trajectory([config]) # Nothing else to do return # Comparison keys # This one is a bit tricky: We would have problems if we compare with budgets because we might have different # scenarios (depending on the incumbent selection specified in Successive Halving). # 1) Any budget/highest observed budget: We want to get rid of the budgets because if we know it is calculated # on the same instance-seed already then we are ready to go. Imagine we would check for the same budgets, # then the configs can not be compared although the user does not care on which budgets configurations have # been evaluated. # 2) Highest budget: We only want to compare the configs if they are evaluated on the highest budget. # Here we do actually care about the budgets. Please see the ``get_instance_seed_budget_keys`` method from # Successive Halving to get more information. # Noitce: compare=True only takes effect when subclass implemented it. -- e.g. in SH it # will remove the budgets from the keys. config_isb_comparison_keys = self.get_instance_seed_budget_keys(config, compare=True) # Find the lowest intersection of instance-seed-budget keys for all incumbents. config_incumbent_isb_comparison_keys = self.get_incumbent_instance_seed_budget_keys(compare=True) # Now we have to check if the new config has been evaluated on the same keys as the incumbents if not all([key in config_isb_comparison_keys for key in config_incumbent_isb_comparison_keys]): # We can not tell if the new config is better/worse than the incumbents because it has not been # evaluated on the necessary trials logger.debug( f"Could not compare config {config_hash} with incumbents because it's evaluated on " f"different trials." ) # The config has to go to a queue now as it is a challenger and a potential incumbent return else: # If all instances are available and the config is incumbent and even evaluated on more trials # then there's nothing we can do if config in incumbents and len(config_isb_keys) > len(incumbent_isb_keys): logger.debug( "Config is already an incumbent but can not be compared to other incumbents because " "the others are missing trials." ) return # Add config to incumbents so that we compare only the new config and existing incumbents if config not in incumbents: incumbents.append(config) incumbent_ids.append(config_id) # Now we get all instance-seed-budget keys for each incumbent (they might be different when using budgets) all_incumbent_isb_keys = [] for incumbent in incumbents: all_incumbent_isb_keys.append(self.get_instance_seed_budget_keys(incumbent)) # We compare the incumbents now and only return the ones on the pareto front new_incumbents = calculate_pareto_front(rh, incumbents, all_incumbent_isb_keys) new_incumbent_ids = [rh.get_config_id(c) for c in new_incumbents] if len(previous_incumbents) == len(new_incumbents): if previous_incumbents == new_incumbents: # No changes in the incumbents self._remove_rejected_config(config_id) return else: # In this case, we have to determine which config replaced which incumbent and reject it removed_incumbent_id = list(set(previous_incumbent_ids) - set(new_incumbent_ids))[0] removed_incumbent_hash = get_config_hash(rh.get_config(removed_incumbent_id)) self._add_rejected_config(removed_incumbent_id) if removed_incumbent_id == config_id: logger.debug( f"Rejected config {config_hash} because it is not better than the incumbents on " f"{len(config_isb_keys)} instances." ) else: self._remove_rejected_config(config_id) logger.info( f"Added config {config_hash} and rejected config {removed_incumbent_hash} as incumbent because " f"it is not better than the incumbents on {len(config_isb_keys)} instances:" ) print_config_changes(rh.get_config(removed_incumbent_id), config, logger=logger) elif len(previous_incumbents) < len(new_incumbents): # Config becomes a new incumbent; nothing is rejected in this case self._remove_rejected_config(config_id) logger.info( f"Config {config_hash} is a new incumbent. " f"Total number of incumbents: {len(new_incumbents)}." ) else: # There might be situations that the incumbents might be removed because of updated cost information of # config for incumbent in previous_incumbents: if incumbent not in new_incumbents: self._add_rejected_config(incumbent) logger.debug( f"Removed incumbent {get_config_hash(incumbent)} because of the updated costs from config " f"{config_hash}." ) # Cut incumbents: We only want to keep a specific number of incumbents # We use the crowding distance for that if len(new_incumbents) > self._max_incumbents: new_incumbents = sort_by_crowding_distance(rh, new_incumbents, all_incumbent_isb_keys) new_incumbents = new_incumbents[: self._max_incumbents] # or random? # idx = self._rng.randint(0, len(new_incumbents)) # del new_incumbents[idx] # del new_incumbent_ids[idx] logger.info( f"Removed one incumbent using crowding distance because more than {self._max_incumbents} are " "available." ) self._update_trajectory(new_incumbents)
[docs] @abstractmethod def __iter__(self) -> Iterator[TrialInfo]: """Main loop of the intensifier. This method always returns a TrialInfo object, although the intensifier algorithm may need to wait for the result of the trial. Please refer to a specific intensifier to get more information. """ raise NotImplementedError
[docs] def get_state(self) -> dict[str, Any]: """The current state of the intensifier. Used to restore the state of the intensifier when continuing a run.""" return {}
[docs] def set_state(self, state: dict[str, Any]) -> None: """Sets the state of the intensifier. Used to restore the state of the intensifier when continuing a run.""" pass
[docs] def save(self, filename: str | Path) -> None: """Saves the current state of the intensifier. In addition to the state (retrieved by ``get_state``), this method also saves the incumbents and trajectory. """ if isinstance(filename, str): filename = Path(filename) assert str(filename).endswith(".json") filename.parent.mkdir(parents=True, exist_ok=True) data = { "incumbent_ids": [self.runhistory.get_config_id(config) for config in self._incumbents], "rejected_config_ids": self._rejected_config_ids, "incumbents_changed": self._incumbents_changed, "trajectory": [dataclasses.asdict(item) for item in self._trajectory], "state": self.get_state(), } with open(filename, "w") as fp: json.dump(data, fp, indent=2)
[docs] def load(self, filename: str | Path) -> None: """Loads the latest state of the intensifier including the incumbents and trajectory.""" if isinstance(filename, str): filename = Path(filename) try: with open(filename) as fp: data = json.load(fp) except Exception as e: logger.warning( f"Encountered exception {e} while reading runhistory from {filename}. Not adding any trials!" ) return # We reset the intensifier and then reset the runhistory self.reset() if self._runhistory is not None: self.runhistory = self._runhistory self._incumbents = [self.runhistory.get_config(config_id) for config_id in data["incumbent_ids"]] self._incumbents_changed = data["incumbents_changed"] self._rejected_config_ids = data["rejected_config_ids"] self._trajectory = [TrajectoryItem(**item) for item in data["trajectory"]] self.set_state(data["state"])
def _update_trajectory(self, configs: list[Configuration]) -> None: rh = self.runhistory config_ids = [rh.get_config_id(c) for c in configs] costs = [rh.average_cost(c, normalize=False) for c in configs] self._incumbents = configs self._incumbents_changed += 1 self._trajectory.append( TrajectoryItem( config_ids=config_ids, costs=costs, trial=rh.finished, walltime=self.used_walltime, ) ) logger.debug("Updated trajectory.") def _add_rejected_config(self, config: Configuration | int) -> None: if isinstance(config, Configuration): config_id = self.runhistory.get_config_id(config) else: config_id = config if config_id not in self._rejected_config_ids: self._rejected_config_ids.append(config_id) def _remove_rejected_config(self, config: Configuration | int) -> None: if isinstance(config, Configuration): config_id = self.runhistory.get_config_id(config) else: config_id = config if config_id in self._rejected_config_ids: self._rejected_config_ids.remove(config_id) def _reorder_instance_seed_keys( self, instance_seed_keys: list[InstanceSeedKey], *, seed: int | None = None, ) -> list[InstanceSeedKey]: """Shuffles the instance-seed keys by groups (first all instances, then all seeds). The following is done: - Group by seeds - Shuffle instances in the group of seeds - Attach groups together """ if seed is None: rng = self._rng else: rng = np.random.RandomState(seed) groups = defaultdict(list) for key in instance_seed_keys: groups[key.seed].append(key) assert key.seed in self._tf_seeds # Shuffle groups + attach groups together shuffled_keys: list[InstanceSeedKey] = [] for seed in self._tf_seeds: if seed in groups and len(groups[seed]) > 0: # Shuffle pairs in the group and add to shuffled pairs shuffled = rng.choice(groups[seed], size=len(groups[seed]), replace=False) # type: ignore shuffled_keys += [pair for pair in shuffled] # type: ignore # Small sanity check assert len(shuffled_keys) == len(instance_seed_keys) return shuffled_keys