Source code for smac.intensifier.hyperband_worker

from __future__ import annotations

from typing import Callable, Iterator

import numpy as np
from ConfigSpace import Configuration

from smac.intensifier.hyperband import Hyperband
from smac.intensifier.successive_halving_worker import SuccessiveHalvingWorker
from smac.runhistory import TrialInfo, TrialInfoIntent, TrialValue
from smac.runhistory.runhistory import RunHistory
from smac.utils.logging import get_logger

__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"


[docs]class HyperbandWorker(SuccessiveHalvingWorker): """This is the worker class for Hyperband. Warning ------- Do not use this class as stand-alone. Parameters ---------- hyperband : Hyperband The controller of the instance. identifier : int, defaults to 0 Adds a numerical identifier on the instance. Used for debug and tagging logger messages properly. """ def __init__( self, hyperband: Hyperband, identifier: int = 0, ) -> None: super().__init__( successive_halving=hyperband, identifier=identifier, ) min_budget = hyperband._min_budget max_budget = hyperband._max_budget eta = hyperband._eta # Overwrite logger self._identifier = identifier self._logger = get_logger(f"{__name__}.{identifier}") # To track completed hyperband iterations self._hyperband = hyperband self._hb_iters = 0 self._sh_intensifier: SuccessiveHalvingWorker | None = None # Setting initial running budget for future iterations (s & s_max from Algorithm 1) self._s_max = int(np.floor(np.log(max_budget / min_budget) / np.log(eta))) self._s = self._s_max # We update our sh intensifier directly self._update_worker() @property def uses_seeds(self) -> bool: # noqa: D102 return self._hyperband.uses_seeds @property def uses_budgets(self) -> bool: # noqa: D102 return self._hyperband.uses_budgets @property def uses_instances(self) -> bool: # noqa: D102 return self._hyperband.uses_instances
[docs] def process_results( self, trial_info: TrialInfo, trial_value: TrialValue, incumbent: Configuration | None, runhistory: RunHistory, time_bound: float, log_trajectory: bool = True, ) -> tuple[Configuration, float]: # noqa: D102 assert self._sh_intensifier # run 1 iteration of successive halving incumbent, inc_perf = self._sh_intensifier.process_results( trial_info=trial_info, trial_value=trial_value, incumbent=incumbent, runhistory=runhistory, time_bound=time_bound, log_trajectory=log_trajectory, ) self._num_trials += 1 # reset if SH iteration is over, else update for next iteration if self._sh_intensifier._iteration_done: self._update_stage() return incumbent, inc_perf
[docs] def get_next_trial( self, challengers: list[Configuration] | None, incumbent: Configuration, get_next_configurations: Callable[[], Iterator[Configuration]] | None, runhistory: RunHistory, repeat_configs: bool = True, n_workers: int = 1, ) -> tuple[TrialInfoIntent, TrialInfo]: # noqa: D102 if n_workers > 1: raise ValueError( "HyperBand does not support more than 1 worker, yet " "the argument n_workers to get_next_trial is {}".format(n_workers) ) # Sampling from next challenger marks the beginning of a new iteration self._iteration_done = False assert self._sh_intensifier intent, trial_info = self._sh_intensifier.get_next_trial( challengers=challengers, incumbent=incumbent, get_next_configurations=get_next_configurations, runhistory=runhistory, repeat_configs=self._sh_intensifier._repeat_configs, ) # For testing purposes, this attribute highlights whether a new challenger is proposed or not # Not required from a functional perspective self._new_challenger = self._sh_intensifier._new_challenger return intent, trial_info
def _update_stage(self, runhistory: RunHistory = None) -> None: if self._s == 0: # Reset if HB iteration is over self._s = self._s_max self._hb_iters += 1 self._iteration_done = True self._num_trials = 0 else: # Update for next iteration self._s -= 1 self._update_worker() def _update_worker(self) -> None: """Updates the successive halving worker based on `self._s`.""" max_budget = self._hyperband._max_budget eta = self._hyperband._eta # Compute min budget for new SH run sh_min_budget = eta**-self._s * max_budget # Sample challengers for next iteration (based on HpBandster package) n_challengers = int(np.floor((self._s_max + 1) / (self._s + 1)) * eta**self._s) # Compute this for the next round n_configs_in_stage = n_challengers * np.power(eta, -np.linspace(0, self._s, self._s + 1)) n_configs_in_stage = np.array(np.round(n_configs_in_stage), dtype=int).tolist() self._logger.info( "Finished Hyperband iteration-step %d-%d with initial budget %d." % (self._hb_iters + 1, self._s_max - self._s + 1, sh_min_budget) ) # Creating a new Successive Halving intensifier with the current running budget self._sh_intensifier = SuccessiveHalvingWorker( successive_halving=self._hyperband, identifier=self._identifier, _all_budgets=self._all_budgets[(-self._s - 1) :], _n_configs_in_stage=n_configs_in_stage, _min_budget=sh_min_budget, _max_budget=max_budget, ) self._sh_intensifier._stats = self._stats