from typing import Dict, List, Mapping, Optional, Tuple
import warnings
import numpy as np
from smac.configspace import Configuration
from smac.intensification.abstract_racer import AbstractRacer, RunInfoIntent
from smac.optimizer.configuration_chooser.epm_chooser import EPMChooser
from smac.runhistory.runhistory import RunHistory, RunInfo, RunValue
from smac.stats.stats import Stats
from smac.utils.io.traj_logging import TrajLogger
__copyright__ = "Copyright 2021, AutoML.org Freiburg-Hannover"
__license__ = "3-clause BSD"
[docs]class ParallelScheduler(AbstractRacer):
"""Common Racer class for Intensifiers that will schedule configurations on a parallel fashion.
This class instantiates intensifier objects on a need basis, that is, to
prevent workers from being idle. This intensifier objects will give configurations
to run
Parameters
----------
stats: smac.stats.stats.Stats
stats object
traj_logger: smac.utils.io.traj_logging.TrajLogger
TrajLogger object to log all new incumbents
rng : np.random.RandomState
instances : List[str]
list of all instance ids
instance_specifics : Mapping[str, str]
mapping from instance name to instance specific string
cutoff : Optional[int]
cutoff of TA runs
deterministic : bool
whether the TA is deterministic or not
initial_budget : Optional[float]
minimum budget allowed for 1 run of successive halving
max_budget : Optional[float]
maximum budget allowed for 1 run of successive halving
eta : float
'halving' factor after each iteration in a successive halving run. Defaults to 3
num_initial_challengers : Optional[int]
number of challengers to consider for the initial budget. If None, calculated internally
run_obj_time : bool
whether the run objective is runtime or not (if true, apply adaptive capping)
n_seeds : Optional[int]
Number of seeds to use, if TA is not deterministic. Defaults to None, i.e., seed is set as 0
instance_order : Optional[str]
how to order instances. Can be set to: [None, shuffle_once, shuffle]
* None - use as is given by the user
* shuffle_once - shuffle once and use across all SH run (default)
* shuffle - shuffle before every SH run
adaptive_capping_slackfactor : float
slack factor of adpative capping (factor * adaptive cutoff)
inst_seed_pairs : List[Tuple[str, int]], optional
Do not set this argument, it will only be used by hyperband!
min_chall: int
minimal number of challengers to be considered (even if time_bound is exhausted earlier). This class will
raise an exception if a value larger than 1 is passed.
incumbent_selection: str
How to select incumbent in successive halving. Only active for real-valued budgets.
Can be set to: [highest_executed_budget, highest_budget, any_budget]
* highest_executed_budget - incumbent is the best in the highest budget run so far (default)
* highest_budget - incumbent is selected only based on the highest budget
* any_budget - incumbent is the best on any budget i.e., best performance regardless of budget
"""
def __init__(
self,
stats: Stats,
traj_logger: TrajLogger,
rng: np.random.RandomState,
instances: List[str],
instance_specifics: Mapping[str, str] = None,
cutoff: Optional[float] = None,
deterministic: bool = False,
initial_budget: Optional[float] = None,
max_budget: Optional[float] = None,
eta: float = 3,
num_initial_challengers: Optional[int] = None,
run_obj_time: bool = True,
n_seeds: Optional[int] = None,
instance_order: Optional[str] = "shuffle_once",
adaptive_capping_slackfactor: float = 1.2,
inst_seed_pairs: Optional[List[Tuple[str, int]]] = None,
min_chall: int = 1,
incumbent_selection: str = "highest_executed_budget",
) -> None:
super().__init__(
stats=stats,
traj_logger=traj_logger,
rng=rng,
instances=instances,
instance_specifics=instance_specifics,
cutoff=cutoff,
deterministic=deterministic,
run_obj_time=run_obj_time,
adaptive_capping_slackfactor=adaptive_capping_slackfactor,
min_chall=min_chall,
)
# We have a pool of instances that yield configurations ot run
self.intensifier_instances = {} # type: Dict[int, AbstractRacer]
self.print_worker_warning = True
[docs] def get_next_run(
self,
challengers: Optional[List[Configuration]],
incumbent: Configuration,
chooser: Optional[EPMChooser],
run_history: RunHistory,
repeat_configs: bool = False,
num_workers: int = 1,
) -> Tuple[RunInfoIntent, RunInfo]:
"""This procedure decides from which instance to pick a config, in order to determine the
next run.
To prevent having idle workers, this procedure creates new instances
up to the maximum number of workers available.
If no new intensifier instance can be created and all intensifier
objects need to wait for more data, this procedure sends a wait request to smbo.
Parameters
----------
challengers : List[Configuration]
promising configurations
incumbent: Configuration
incumbent configuration
chooser : smac.optimizer.epm_configuration_chooser.EPMChooser
optimizer that generates next configurations to use for racing
run_history : smac.runhistory.runhistory.RunHistory
stores all runs we ran so far
repeat_configs : bool
if False, an evaluated configuration will not be generated again
num_workers: int
the maximum number of workers available
at a given time.
Returns
-------
intent: RunInfoIntent
Indicator of how to consume the RunInfo object
run_info: RunInfo
An object that encapsulates the minimum information to
evaluate a configuration
"""
if num_workers <= 1 and self.print_worker_warning:
warnings.warn(
f"{self.__class__.__name__} is executed with {num_workers} workers only. "
"Consider to use pynisher to use all available workers."
)
self.print_worker_warning = False
# If repeat_configs is True, that means that not only self can repeat
# configurations, but also in the context of multiprocessing, N
# intensifier instances will also share configurations. The later
# is not supported
if repeat_configs:
raise ValueError("repeat_configs==True is not supported for parallel execution")
# First get a config to run from a SH instance
for i in self._sort_instances_by_stage(self.intensifier_instances):
intent, run_info = self.intensifier_instances[i].get_next_run(
challengers=challengers,
incumbent=incumbent,
chooser=chooser,
run_history=run_history,
repeat_configs=repeat_configs,
)
# if asked to wait, the intensifier cannot come up
# with a new configuration, so we continue
if intent == RunInfoIntent.WAIT:
continue
return intent, run_info
# If gotten to this point, we might look into adding a new
# intensifier
if self._add_new_instance(num_workers):
return self.intensifier_instances[len(self.intensifier_instances) - 1].get_next_run(
challengers=challengers,
incumbent=incumbent,
chooser=chooser,
run_history=run_history,
repeat_configs=repeat_configs,
)
# If got to this point, no new instance can be added as
# there are no idle workers and all running instances have to
# wait, so we return a wait intent
return RunInfoIntent.WAIT, RunInfo(
config=None,
instance="0",
instance_specific="0",
seed=0,
cutoff=None,
capped=False,
budget=0.0,
)
[docs] def process_results(
self,
run_info: RunInfo,
incumbent: Optional[Configuration],
run_history: RunHistory,
time_bound: float,
result: RunValue,
log_traj: bool = True,
) -> Tuple[Configuration, float]:
"""The intensifier stage will be updated based on the results/status of a configuration
execution.
To do so, this procedures redirects the result argument, to the
respective intensifier object that generated the original config.
Also, an incumbent will be determined. This determination is done
using the complete run history, so we rely on the current intensifier
choice of incumbent. That is, no need to go over each instance to
get the incumbent, as there is no local runhistory
Parameters
----------
run_info : RunInfo
A RunInfo containing the configuration that was evaluated
incumbent : Optional[Configuration]
Best configuration seen so far
run_history : RunHistory
stores all runs we ran so far
if False, an evaluated configuration will not be generated again
time_bound : float
time in [sec] available to perform intensify
result: RunValue
Contain the result (status and other methadata) of exercising
a challenger/incumbent.
log_traj: bool
Whether to log changes of incumbents in trajectory
Returns
-------
incumbent: Configuration
current (maybe new) incumbent configuration
inc_perf: float
empirical performance of incumbent configuration
"""
return self.intensifier_instances[run_info.source_id].process_results(
run_info=run_info,
incumbent=incumbent,
run_history=run_history,
time_bound=time_bound,
result=result,
log_traj=log_traj,
)
def _add_new_instance(self, num_workers: int) -> bool:
"""Decides if it is possible to add a new intensifier instance, and adds it. If a new
intensifier instance is added, True is returned, else False.
Parameters
----------
num_workers: int
the maximum number of workers available
at a given time.
Returns
-------
Whether or not a successive halving instance was added
"""
raise NotImplementedError()
def _get_intensifier_ranking(self, intensifier: AbstractRacer) -> Tuple[int, int]:
"""Given a intensifier, returns how advance it is. This metric will be used to determine
what priority to assign to the intensifier.
Parameters
----------
intensifier: AbstractRacer
Intensifier to rank based on run progress
Returns
-------
ranking: int
the higher this number, the faster the intensifier will get
the running resources. For hyperband we can use the
sh_intensifier stage, for example
tie_breaker: int
The configurations that have been launched to break ties. For
example, in the case of Successive Halving it can be the number
of configurations launched
"""
raise NotImplementedError()
def _sort_instances_by_stage(self, instances: Dict[int, AbstractRacer]) -> List[int]:
"""This procedure dictates what SH to prioritize in launching jobs. It prioritizes resource
allocation to SH instances that have higher stages. In case of tie, we prioritize the SH
instance with more launched configs.
Parameters
----------
instances: Dict[int, AbstractRacer]
Dict with the instances to prioritize
Returns
-------
List:
The order in which to query for new jobs
"""
# This function might be called when no intensifier instances
# exist (first iteration), so we return an empty list in that case
if len(instances) == 0:
return []
# We want to prioritize runs that are close to finishing an iteration.
# In the context of successive halving for example, an iteration has stages
# that are composed of # of configs and each configs has # of instance-seed pairs
# so ranking will be the stage (the higher the stage, the more we want this run
# to be finished earlier). Also, in case of tie (runs at same stage) we need a
# tie breaker, which can be the number of configs already launched
preference = []
for i, sh in instances.items():
ranking, tie_breaker = self._get_intensifier_ranking(sh)
preference.append(
(i, ranking, tie_breaker),
)
# First we sort by config/instance/seed as the less important criteria
preference.sort(key=lambda x: x[2], reverse=True)
# Second by stage. The more advanced the stage is, the more we want
# this intensifier to finish early
preference.sort(key=lambda x: x[1], reverse=True)
return [i for i, s, c in preference]