Source code for smac.intensification.hyperband

from typing import List, Mapping, Optional, Tuple

import logging

import numpy as np

from smac.configspace import Configuration
from smac.intensification.abstract_racer import AbstractRacer, RunInfoIntent
from smac.intensification.parallel_scheduling import ParallelScheduler
from smac.intensification.successive_halving import _SuccessiveHalving
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.runhistory.runhistory import (  # noqa: F401
    RunHistory,
    RunInfo,
    RunValue,
    StatusType,
)
from smac.stats.stats import Stats
from smac.utils.io.traj_logging import TrajLogger

__author__ = "Ashwin Raaghav Narayanan"
__copyright__ = "Copyright 2019, ML4AAD"
__license__ = "3-clause BSD"


class _Hyperband(_SuccessiveHalving):
    """Races multiple challengers against an incumbent using Hyperband method.

    This class contains the logic to implement:
    "BOHB: Robust and Efficient Hyperparameter Optimization at Scale" (Falkner et al. 2018)

    Objects from this class are meant to run on a single worker. `Hyperband` method,
    creates multiple _Hyperband instances to allow parallelism, and for this reason
    `Hyperband` should be considered the user interface whereas `_Hyperband` a private
    class with the actual implementation of the method.

    Parameters
    ----------
    stats: smac.stats.stats.Stats
        stats object
    traj_logger: smac.utils.io.traj_logging.TrajLogger
        TrajLogger object to log all new incumbents
    rng : np.random.RandomState
    instances : List[str]
        list of all instance ids
    instance_specifics : Mapping[str, str]
        mapping from instance name to instance specific string
    cutoff : Optional[int]
        runtime cutoff of TA runs
    deterministic : bool
        whether the TA is deterministic or not
    initial_budget : Optional[float]
        minimum budget allowed for 1 run of successive halving
    max_budget : Optional[float]
        maximum budget allowed for 1 run of successive halving
    eta : float
        'halving' factor after each iteration in a successive halving run. Defaults to 3
    run_obj_time : bool
        whether the run objective is runtime or not (if true, apply adaptive capping)
    n_seeds : Optional[int]
        Number of seeds to use, if TA is not deterministic. Defaults to None, i.e., seed is set as 0
    instance_order : Optional[str]
        how to order instances. Can be set to: [None, shuffle_once, shuffle]
        * None - use as is given by the user
        * shuffle_once - shuffle once and use across all SH run (default)
        * shuffle - shuffle before every SH run
    adaptive_capping_slackfactor : float
        slack factor of adpative capping (factor * adpative cutoff)
    min_chall: int
        minimal number of challengers to be considered (even if time_bound is exhausted earlier). This class will
        raise an exception if a value larger than 1 is passed.
    incumbent_selection: str
        How to select incumbent in successive halving. Only active for real-valued budgets.
        Can be set to: [highest_executed_budget, highest_budget, any_budget]
        * highest_executed_budget - incumbent is the best in the highest budget run so far (default)
        * highest_budget - incumbent is selected only based on the highest budget
        * any_budget - incumbent is the best on any budget i.e., best performance regardless of budget
    identifier: int
        Allows to identify the _Hyperband instance in case of multiple ones
    """

    def __init__(
        self,
        stats: Stats,
        traj_logger: TrajLogger,
        rng: np.random.RandomState,
        instances: List[str],
        instance_specifics: Mapping[str, str] = None,
        cutoff: Optional[float] = None,
        deterministic: bool = False,
        initial_budget: Optional[float] = None,
        max_budget: Optional[float] = None,
        eta: float = 3,
        run_obj_time: bool = True,
        n_seeds: Optional[int] = None,
        instance_order: str = "shuffle_once",
        adaptive_capping_slackfactor: float = 1.2,
        min_chall: int = 1,
        incumbent_selection: str = "highest_executed_budget",
        identifier: int = 0,
    ) -> None:

        super().__init__(
            stats=stats,
            traj_logger=traj_logger,
            rng=rng,
            instances=instances,
            instance_specifics=instance_specifics,
            cutoff=cutoff,
            deterministic=deterministic,
            initial_budget=initial_budget,
            max_budget=max_budget,
            eta=eta,
            num_initial_challengers=None,  # initial challengers passed as None
            run_obj_time=run_obj_time,
            n_seeds=n_seeds,
            instance_order=instance_order,
            adaptive_capping_slackfactor=adaptive_capping_slackfactor,
            min_chall=min_chall,
            incumbent_selection=incumbent_selection,
        )

        self.identifier = identifier

        self.logger = logging.getLogger(self.__module__ + "." + str(self.identifier) + "." + self.__class__.__name__)

        # to track completed hyperband iterations
        self.hb_iters = 0
        self.sh_intensifier = None  # type: _SuccessiveHalving # type: ignore[assignment]

    def process_results(
        self,
        run_info: RunInfo,
        incumbent: Optional[Configuration],
        run_history: RunHistory,
        time_bound: float,
        result: RunValue,
        log_traj: bool = True,
    ) -> Tuple[Configuration, float]:
        """The intensifier stage will be updated based on the results/status of a configuration
        execution. Also, a incumbent will be determined.

        Parameters
        ----------
        run_info : RunInfo
               A RunInfo containing the configuration that was evaluated
        incumbent : Optional[Configuration]
            Best configuration seen so far
        run_history : RunHistory
            stores all runs we ran so far
            if False, an evaluated configuration will not be generated again
        time_bound : float
            time in [sec] available to perform intensify
        result: RunValue
            Contain the result (status and other methadata) of exercising
            a challenger/incumbent.
        log_traj: bool
            Whether to log changes of incumbents in trajectory

        Returns
        -------
        incumbent: Configuration
            current (maybe new) incumbent configuration
        inc_perf: float
            empirical performance of incumbent configuration
        """
        # run 1 iteration of successive halving
        incumbent, inc_perf = self.sh_intensifier.process_results(
            run_info=run_info,
            incumbent=incumbent,
            run_history=run_history,
            time_bound=time_bound,
            result=result,
            log_traj=log_traj,
        )
        self.num_run += 1

        # reset if SH iteration is over, else update for next iteration
        if self.sh_intensifier.iteration_done:
            self._update_stage()

        return incumbent, inc_perf

    def get_next_run(
        self,
        challengers: Optional[List[Configuration]],
        incumbent: Configuration,
        chooser: Optional[EPMChooser],
        run_history: RunHistory,
        repeat_configs: bool = True,
        num_workers: int = 1,
    ) -> Tuple[RunInfoIntent, RunInfo]:
        """Selects which challenger to use based on the iteration stage and set the iteration
        parameters. First iteration will choose configurations from the ``chooser`` or input
        challengers, while the later iterations pick top configurations from the previously selected
        challengers in that iteration.

        If no new run is available, the method returns a configuration of None.

        Parameters
        ----------
        challengers : List[Configuration]
            promising configurations
        incumbent: Configuration
               incumbent configuration
        chooser : smac.optimizer.epm_configuration_chooser.EPMChooser
            optimizer that generates next configurations to use for racing
        run_history : smac.runhistory.runhistory.RunHistory
            stores all runs we ran so far
        repeat_configs : bool
            if False, an evaluated configuration will not be generated again
        num_workers: int
            the maximum number of workers available
            at a given time.

        Returns
        -------
        intent: RunInfoIntent
               Indicator of how to consume the RunInfo object
        run_info: RunInfo
               An object that encapsulates necessary information for a config run
        """
        if num_workers > 1:
            raise ValueError(
                "HyperBand does not support more than 1 worker, yet "
                "the argument num_workers to get_next_run is {}".format(num_workers)
            )

        if not hasattr(self, "s"):
            # initialize tracking variables
            self._update_stage()

        # sampling from next challenger marks the beginning of a new iteration
        self.iteration_done = False

        intent, run_info = self.sh_intensifier.get_next_run(
            challengers=challengers,
            incumbent=incumbent,
            chooser=chooser,
            run_history=run_history,
            repeat_configs=self.sh_intensifier.repeat_configs,
        )

        # For testing purposes, this attribute highlights whether a
        # new challenger is proposed or not. Not required from a functional
        # perspective
        self.new_challenger = self.sh_intensifier.new_challenger

        return intent, run_info

    def _update_stage(self, run_history: RunHistory = None) -> None:
        """Update tracking information for a new stage/iteration and update statistics. This method
        is called to initialize stage variables and after all configurations of a successive halving
        stage are completed.

        Parameters
        ----------
         run_history : smac.runhistory.runhistory.RunHistory
            stores all runs we ran so far
        """
        if not hasattr(self, "s"):
            # setting initial running budget for future iterations (s & s_max from Algorithm 1)
            self.s_max = int(np.floor(np.log(self.max_budget / self.initial_budget) / np.log(self.eta)))
            self.s = self.s_max
        elif self.s == 0:
            # reset if HB iteration is over
            self.s = self.s_max
            self.hb_iters += 1
            self.iteration_done = True
            self.num_run = 0
        else:
            # update for next iteration
            self.s -= 1

        # compute min budget for new SH run
        sh_initial_budget = self.eta**-self.s * self.max_budget
        # sample challengers for next iteration (based on HpBandster package)
        n_challengers = int(np.floor((self.s_max + 1) / (self.s + 1)) * self.eta**self.s)

        # Compute this for the next round
        n_configs_in_stage = n_challengers * np.power(self.eta, -np.linspace(0, self.s, self.s + 1))
        n_configs_in_stage = np.array(np.round(n_configs_in_stage), dtype=int).tolist()

        self.logger.info(
            "Hyperband iteration-step: %d-%d  with initial budget: %d"
            % (self.hb_iters + 1, self.s_max - self.s + 1, sh_initial_budget)
        )

        # creating a new Successive Halving intensifier with the current running budget
        self.sh_intensifier = _SuccessiveHalving(
            stats=self.stats,
            traj_logger=self.traj_logger,
            rng=self.rs,
            instances=self.instances,
            instance_specifics=self.instance_specifics,
            cutoff=self.cutoff,
            deterministic=self.deterministic,
            initial_budget=sh_initial_budget,
            max_budget=self.max_budget,
            eta=self.eta,
            _all_budgets=self.all_budgets[(-self.s - 1) :],
            _n_configs_in_stage=n_configs_in_stage,
            num_initial_challengers=n_challengers,
            run_obj_time=self.run_obj_time,
            n_seeds=self.n_seeds,
            instance_order=self.instance_order,
            adaptive_capping_slackfactor=self.adaptive_capping_slackfactor,
            inst_seed_pairs=self.inst_seed_pairs,  # additional argument to avoid
            identifier=self.identifier,
        )  # processing instances & seeds again


[docs]class Hyperband(ParallelScheduler): """Races multiple challengers against an incumbent using Hyperband method. Implementation from "BOHB: Robust and Efficient Hyperparameter Optimization at Scale" (Falkner et al. 2018) Hyperband is an extension of the Successive Halving intensifier. Please refer to `SuccessiveHalving` documentation for more detailed information about the different types of budgets possible and the way instances are handled. Internally, this class uses the _Hyperband private class which actually implements the hyperband logic. To allow for parallelism, Hyperband can create multiple _Hyperband instances, based on the number of idle workers available. Parameters ---------- stats: smac.stats.stats.Stats stats object traj_logger: smac.utils.io.traj_logging.TrajLogger TrajLogger object to log all new incumbents rng : np.random.RandomState instances : List[str] list of all instance ids instance_specifics : Mapping[str, str] mapping from instance name to instance specific string cutoff : Optional[int] runtime cutoff of TA runs deterministic : bool whether the TA is deterministic or not initial_budget : Optional[float] minimum budget allowed for 1 run of successive halving max_budget : Optional[float] maximum budget allowed for 1 run of successive halving eta : float 'halving' factor after each iteration in a successive halving run. Defaults to 3 run_obj_time : bool whether the run objective is runtime or not (if true, apply adaptive capping) n_seeds : Optional[int] Number of seeds to use, if TA is not deterministic. Defaults to None, i.e., seed is set as 0 instance_order : Optional[str] how to order instances. Can be set to: [None, shuffle_once, shuffle] * None - use as is given by the user * shuffle_once - shuffle once and use across all SH run (default) * shuffle - shuffle before every SH run adaptive_capping_slackfactor : float slack factor of adpative capping (factor * adpative cutoff) min_chall: int minimal number of challengers to be considered (even if time_bound is exhausted earlier). This class will raise an exception if a value larger than 1 is passed. incumbent_selection: str How to select incumbent in successive halving. Only active for real-valued budgets. Can be set to: [highest_executed_budget, highest_budget, any_budget] * highest_executed_budget - incumbent is the best in the highest budget run so far (default) * highest_budget - incumbent is selected only based on the highest budget * any_budget - incumbent is the best on any budget i.e., best performance regardless of budget """ def __init__( self, stats: Stats, traj_logger: TrajLogger, rng: np.random.RandomState, instances: List[str], instance_specifics: Mapping[str, str] = None, cutoff: Optional[float] = None, deterministic: bool = False, initial_budget: Optional[float] = None, max_budget: Optional[float] = None, eta: float = 3, run_obj_time: bool = True, n_seeds: Optional[int] = None, instance_order: str = "shuffle_once", adaptive_capping_slackfactor: float = 1.2, min_chall: int = 1, incumbent_selection: str = "highest_executed_budget", ) -> None: super().__init__( stats=stats, traj_logger=traj_logger, rng=rng, instances=instances, instance_specifics=instance_specifics, cutoff=cutoff, deterministic=deterministic, run_obj_time=run_obj_time, adaptive_capping_slackfactor=adaptive_capping_slackfactor, min_chall=min_chall, ) self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) # Parameters for a new hyperband self.n_seeds = n_seeds self.instance_order = instance_order self.incumbent_selection = incumbent_selection self._instances = instances self._instance_specifics = instance_specifics self.initial_budget = initial_budget self.max_budget = max_budget self.eta = eta def _get_intensifier_ranking(self, intensifier: AbstractRacer) -> Tuple[int, int]: """Given a intensifier, returns how advance it is. This metric will be used to determine what priority to assign to the intensifier. Parameters ---------- intensifier: AbstractRacer Intensifier to rank based on run progress Returns ------- ranking: int the higher this number, the faster the intensifier will get the running resources. For hyperband we can use the sh_intensifier stage, for example tie_breaker: int The configurations that have been launched to break ties. For example, in the case of Successive Halving it can be the number of configurations launched """ # For mypy -- we expect to work with _Hyperband instances assert isinstance(intensifier, _Hyperband) # For hyperband, we use the internal successive halving as a criteria # to see how advanced this intensifier is stage = 0 if hasattr(intensifier.sh_intensifier, "stage"): # Newly created SuccessiveHalving objects have no stage stage = intensifier.sh_intensifier.stage return stage, len(intensifier.sh_intensifier.run_tracker) def _add_new_instance(self, num_workers: int) -> bool: """Decides if it is possible to add a new intensifier instance, and adds it. If a new intensifier instance is added, True is returned, else False. Parameters ---------- num_workers: int the maximum number of workers available at a given time. Returns ------- Whether or not a new instance was added. """ if len(self.intensifier_instances) >= num_workers: return False self.intensifier_instances[len(self.intensifier_instances)] = _Hyperband( stats=self.stats, traj_logger=self.traj_logger, rng=self.rs, instances=self._instances, instance_specifics=self._instance_specifics, cutoff=self.cutoff, deterministic=self.deterministic, initial_budget=self.initial_budget, max_budget=self.max_budget, eta=self.eta, run_obj_time=self.run_obj_time, n_seeds=self.n_seeds, instance_order=self.instance_order, adaptive_capping_slackfactor=self.adaptive_capping_slackfactor, min_chall=self.min_chall, incumbent_selection=self.incumbent_selection, identifier=len(self.intensifier_instances), ) return True