Source code for smac.runner.abstract_runner

from __future__ import annotations

__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"


from abc import ABC, abstractmethod
from typing import Any, Iterator

import time
import traceback

import numpy as np
from ConfigSpace import Configuration

from smac.runhistory import StatusType, TrialInfo, TrialValue
from smac.scenario import Scenario
from smac.utils.logging import get_logger

logger = get_logger(__name__)


[docs]class AbstractRunner(ABC): """Interface class to handle the execution of SMAC configurations. This interface defines how to interact with the SMBO loop. The complexity of running a configuration as well as handling the results is abstracted to the SMBO via an AbstractRunner. From SMBO perspective, launching a configuration follows a submit/collect scheme as follows: 1. A run is launched via ``submit_run()`` * ``submit_run`` internally calls ``run_wrapper()``, a method that contains common processing functions among different unners. * A class that implements AbstractRunner defines ``run()`` which is really the algorithm to translate a ``TrialInfo`` to a ``TrialValue``, i.e. a configuration to an actual result. 2. A completed run is collected via ``iter_results()``, which iterates and consumes any finished runs, if any. 3. This interface also offers the method ``wait()`` as a mechanism to make sure we have enough data in the next iteration to make a decision. For example, the intensifier might not be able to select the next challenger until more results are available. Parameters ---------- scenario : Scenario required_arguments : list[str] A list of required arguments, which are passed to the target function. """ def __init__( self, scenario: Scenario, required_arguments: list[str] = [], ): self._scenario = scenario self._required_arguments = required_arguments # The results is a FIFO structure, implemented via a list # (because the Queue lock is not pickable). Finished runs are # put in this list and collected via _process_pending_runs self._results_queue: list[tuple[TrialInfo, TrialValue]] = [] self._crash_cost = scenario.crash_cost self._supports_memory_limit = False if isinstance(scenario.objectives, str): objectives = [scenario.objectives] else: objectives = scenario.objectives self._objectives = objectives self._n_objectives = scenario.count_objectives() # We need to exapdn crash cost if the user did not do it if self._n_objectives > 1: if not isinstance(scenario.crash_cost, list): assert isinstance(scenario.crash_cost, float) self._crash_cost = [scenario.crash_cost for _ in range(self._n_objectives)]
[docs] def run_wrapper(self, trial_info: TrialInfo) -> tuple[TrialInfo, TrialValue]: """Wrapper around run() to execute and check the execution of a given config. This function encapsulates common handling/processing, so that run() implementation is simplified. Parameters ---------- trial_info : RunInfo Object that contains enough information to execute a configuration run in isolation. Returns ------- info : TrialInfo An object containing the configuration launched. value : TrialValue Contains information about the status/performance of config. """ start = time.time() try: status, cost, runtime, additional_info = self.run( config=trial_info.config, instance=trial_info.instance, budget=trial_info.budget, seed=trial_info.seed, ) except Exception as e: status = StatusType.CRASHED cost = self._crash_cost runtime = time.time() - start # Add context information to the error message exception_traceback = traceback.format_exc() error_message = repr(e) additional_info = { "traceback": exception_traceback, "error": error_message, } end = time.time() if trial_info.budget == 0 and status == StatusType.DONOTADVANCE: raise ValueError("Cannot handle DONOTADVANCE state when using intensify or SH/HB on instances.") # Catch NaN or inf if not np.all(np.isfinite(cost)): logger.warning( "Target function returned infinity or nothing at all. Result is treated as CRASHED" f" and cost is set to {self._crash_cost}." ) if "traceback" in additional_info: logger.warning(f"Traceback: {additional_info['traceback']}\n") status = StatusType.CRASHED if status == StatusType.CRASHED: cost = self._crash_cost trial_value = TrialValue( status=status, cost=cost, time=runtime, additional_info=additional_info, starttime=start, endtime=end, ) return trial_info, trial_value
@property def meta(self) -> dict[str, Any]: """Returns the meta data of the created object.""" return {"name": self.__class__.__name__}
[docs] @abstractmethod def submit_trial(self, trial_info: TrialInfo) -> None: """This function submits a configuration embedded in a TrialInfo object, and uses one of the workers to produce a result (such result will eventually be available on the `self._results_queue` FIFO). This interface method will be called by SMBO, with the expectation that a function will be executed by a worker. What will be executed is dictated by trial_info, and "how" will it be executed is decided via the child class that implements a run() method. Because config submission can be a serial/parallel endeavor, it is expected to be implemented by a child class. Parameters ---------- trial_info : TrialInfo An object containing the configuration launched. """ raise NotImplementedError
[docs] @abstractmethod def run( self, config: Configuration, instance: str | None = None, budget: float | None = None, seed: int | None = None, ) -> tuple[StatusType, float | list[float], float, dict]: """Runs the target function with a configuration on a single instance-budget-seed combination (aka trial). Parameters ---------- config : Configuration Configuration to be passed to the target function. instance : str | None, defaults to None The Problem instance. budget : float | None, defaults to None A positive, real-valued number representing an arbitrary limit to the target function handled by the target function internally. seed : int, defaults to None Returns ------- status : StatusType Status of the trial. cost : float | list[float] Resulting cost(s) of the trial. runtime : float The time the target function function took to run. additional_info : dict All further additional trial information. """ raise NotImplementedError
[docs] @abstractmethod def iter_results(self) -> Iterator[tuple[TrialInfo, TrialValue]]: """This method returns any finished configuration, and returns a list with the results of exercising the configurations. This class keeps populating results to ``self._results_queue`` until a call to ``get_finished`` trials is done. In this case, the `self._results_queue` list is emptied and all trial values produced by running `run` are returned. Returns ------- Iterator[tuple[TrialInfo, TrialValue]]: A list of TrialInfo/TrialValue tuples, all of which have been finished. """ raise NotImplementedError
[docs] @abstractmethod def wait(self) -> None: """The SMBO/intensifier might need to wait for trials to finish before making a decision.""" raise NotImplementedError
[docs] @abstractmethod def is_running(self) -> bool: """Whether or not there are trials still running. Generally, if the runner is serial, launching a trial instantly returns it's result. On parallel runners, there might be pending configurations to complete. """ raise NotImplementedError
[docs] @abstractmethod def count_available_workers(self) -> int: """Returns the number of available workers.""" raise NotImplementedError