Source code for smac.tae.base

from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Tuple, Union

import math
import time
import traceback

import numpy as np

from smac.configspace import Configuration
from smac.runhistory.runhistory import RunInfo, RunValue
from smac.stats.stats import Stats
from smac.tae import StatusType
from smac.utils.constants import MAXINT
from smac.utils.logging import PickableLoggerAdapter

__copyright__ = "Copyright 2021, AutoML.org Freiburg-Hannover"
__license__ = "3-clause BSD"


[docs]class BaseRunner(ABC): """Interface class to handle the execution of SMAC' configurations. This interface defines how to interact with the SMBO loop. The complexity of running a configuration as well as handling the results is abstracted to the SMBO via a BaseRunner. From SMBO perspective, launching a configuration follows a submit/collect scheme as follows: 1. A run is launched via submit_run() 1. Submit_run internally calls run_wrapper(), a method that contains common processing functions among different runners, for example, handling capping and stats checking. 2. A class that implements BaseRunner defines run() which is really the algorithm to translate a RunInfo to a RunValue, i.e. a configuration to an actual result. 2. A completed run is collected via get_finished_runs(), which returns any finished runs, if any. 3. This interface also offers the method wait() as a mechanism to make sure we have enough data in the next iteration to make a decision. For example, the intensifier might not be able to select the next challenger until more results are available. Parameters ---------- ta : Union[List[str], Callable] target algorithm stats: Stats stats object to collect statistics about runtime/additional info multi_objectives: List[str] names of the objectives, by default it is a single objective parameter "cost" run_obj: str run objective of SMAC par_factor: int penalization factor cost_for_crash : float cost that is used in case of crashed runs (including runs that returned NaN or inf) abort_on_first_run_crash: bool if true and first run crashes, raise FirstRunCrashedException Attributes ---------- results ta stats run_obj par_factor cost_for_crash abort_on_first_run_crash """ def __init__( self, ta: Union[List[str], Callable], stats: Stats, multi_objectives: List[str] = ["cost"], run_obj: str = "runtime", par_factor: int = 1, cost_for_crash: Union[float, List[float]] = float(MAXINT), abort_on_first_run_crash: bool = True, ): # The results is a FIFO structure, implemented via a list # (because the Queue lock is not pickable). Finished runs are # put in this list and collected via process_finished_runs self.results = [] # type: List[Tuple[RunInfo, RunValue]] # Below state the support for a Runner algorithm that # implements a ta self.ta = ta self.stats = stats self.multi_objectives = multi_objectives self.run_obj = run_obj self.par_factor = par_factor self.cost_for_crash = cost_for_crash self.abort_on_first_run_crash = abort_on_first_run_crash self.logger = PickableLoggerAdapter(self.__module__ + "." + self.__class__.__name__) self._supports_memory_limit = False super().__init__()
[docs] @abstractmethod def submit_run(self, run_info: RunInfo) -> None: """This function submits a configuration embedded in a RunInfo object, and uses one of the workers to produce a result (such result will eventually be available on the self.results FIFO). This interface method will be called by SMBO, with the expectation that a function will be executed by a worker. What will be executed is dictated by run_info, and "how" will it be executed is decided via the child class that implements a run() method. Because config submission can be a serial/parallel endeavor, it is expected to be implemented by a child class. Parameters ---------- run_info: RunInfo An object containing the configuration and the necessary data to run it """ pass
[docs] @abstractmethod def run( self, config: Configuration, instance: str, cutoff: Optional[float] = None, seed: int = 12345, budget: Optional[float] = None, instance_specific: str = "0", ) -> Tuple[StatusType, float, float, Dict]: """Runs target algorithm <self.ta> with configuration <config> on instance <instance> with instance specifics. <specifics> for at most. <cutoff> seconds and random seed <seed> This method exemplifies how to defined the run() method Parameters ---------- config : Configuration dictionary param -> value instance : string problem instance cutoff : float, optional Wallclock time limit of the target algorithm. If no value is provided no limit will be enforced. seed : int random seed budget : float, optional A positive, real-valued number representing an arbitrary limit to the target algorithm. Handled by the target algorithm internally instance_specific: str instance specific information (e.g., domain file or solution) Returns ------- status: enum of StatusType (int) {SUCCESS, TIMEOUT, CRASHED, ABORT} cost: float cost/regret/quality (float) (None, if not returned by TA) runtime: float runtime (None if not returned by TA) additional_info: dict all further additional run information """ pass
[docs] def run_wrapper( self, run_info: RunInfo, ) -> Tuple[RunInfo, RunValue]: """Wrapper around run() to exec and check the execution of a given config file. This function encapsulates common handling/processing, so that run() implementation is simplified. Parameters ---------- run_info : RunInfo Object that contains enough information to execute a configuration run in isolation. Returns ------- RunInfo: an object containing the configuration launched RunValue: Contains information about the status/performance of config """ start = time.time() if run_info.cutoff is None and self.run_obj == "runtime": if self.logger: self.logger.critical( "For scenarios optimizing running time " "(run objective), a cutoff time is required, " "but not given to this call." ) raise ValueError( "For scenarios optimizing running time " "(run objective), a cutoff time is required, " "but not given to this call." ) cutoff = None if run_info.cutoff is not None: cutoff = int(math.ceil(run_info.cutoff)) try: status, cost, runtime, additional_info = self.run( config=run_info.config, instance=run_info.instance, cutoff=cutoff, seed=run_info.seed, budget=run_info.budget, instance_specific=run_info.instance_specific, ) except Exception as e: status = StatusType.CRASHED cost = self.cost_for_crash # type: ignore runtime = time.time() - start # Add context information to the error message exception_traceback = traceback.format_exc() error_message = repr(e) additional_info = {"traceback": exception_traceback, "error": error_message} end = time.time() if run_info.budget == 0 and status == StatusType.DONOTADVANCE: raise ValueError("Cannot handle DONOTADVANCE state when using intensify or SH/HB on " "instances.") # Catch NaN or inf. if (self.run_obj == "runtime" and not np.isfinite(runtime)) or ( self.run_obj == "quality" and not np.all(np.isfinite(cost)) ): if self.logger: self.logger.warning( "Target Algorithm returned NaN or inf as {}. " "Algorithm run is treated as CRASHED, cost " "is set to {} for quality scenarios. " '(Change value through "cost_for_crash"' "-option.)".format(self.run_obj, self.cost_for_crash) ) status = StatusType.CRASHED if self.run_obj == "runtime": # The following line pleases mypy - we already check for cutoff not being none above, # prior to calling run. However, mypy assumes that the data type of cutoff # is still Optional[int] assert cutoff is not None if runtime > self.par_factor * cutoff: self.logger.warning( "Returned running time is larger " "than {0} times the passed cutoff time. " "Clamping to {0} x cutoff.".format(self.par_factor) ) runtime = cutoff * self.par_factor status = StatusType.TIMEOUT if status == StatusType.SUCCESS: cost = runtime else: cost = cutoff * self.par_factor if status == StatusType.TIMEOUT and run_info.capped: status = StatusType.CAPPED else: if status == StatusType.CRASHED: cost = self.cost_for_crash # type: ignore return run_info, RunValue( status=status, cost=cost, time=runtime, additional_info=additional_info, starttime=start, endtime=end, )
[docs] @abstractmethod def get_finished_runs(self) -> List[Tuple[RunInfo, RunValue]]: """This method returns any finished configuration, and returns a list with the results of exercising the configurations. This class keeps populating results to self.results until a call to get_finished runs is done. In this case, the self.results list is emptied and all RunValues produced by running run() are returned. Returns ------- List[RunInfo, RunValue]: A list of pais RunInfo/RunValues a submitted configuration """ raise NotImplementedError()
[docs] @abstractmethod def wait(self) -> None: """SMBO/intensifier might need to wait for runs to finish before making a decision. This method waits until 1 run completes """ pass
[docs] @abstractmethod def pending_runs(self) -> bool: """Whether or not there are configs still running. Generally if the runner is serial, launching a run instantly returns it's result. On parallel runners, there might be pending configurations to complete. """ pass
[docs] @abstractmethod def num_workers(self) -> int: """Return the active number of workers that will execute tae runs.""" pass