from __future__ import annotations
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
from abc import ABC, abstractmethod
from typing import Any, Iterator
import time
import traceback
import numpy as np
from ConfigSpace import Configuration
from smac.runhistory import StatusType, TrialInfo, TrialValue
from smac.scenario import Scenario
from smac.utils.logging import get_logger
logger = get_logger(__name__)
[docs]class AbstractRunner(ABC):
"""Interface class to handle the execution of SMAC configurations. This interface defines how to interact with the
SMBO loop. The complexity of running a configuration as well as handling the results is abstracted to the SMBO via
an AbstractRunner.
From SMBO perspective, launching a configuration follows a
submit/collect scheme as follows:
1. A run is launched via ``submit_run()``
* ``submit_run`` internally calls ``run_wrapper()``, a method that contains common processing functions among
different unners.
* A class that implements AbstractRunner defines ``run()`` which is really the algorithm to translate a
``TrialInfo`` to a ``TrialValue``, i.e. a configuration to an actual result.
2. A completed run is collected via ``iter_results()``, which iterates and consumes any finished runs, if any.
3. This interface also offers the method ``wait()`` as a mechanism to make sure we have enough data in the next
iteration to make a decision. For example, the intensifier might not be able to select the next challenger
until more results are available.
Parameters
----------
scenario : Scenario
required_arguments : list[str]
A list of required arguments, which are passed to the target function.
"""
def __init__(
self,
scenario: Scenario,
required_arguments: list[str] = [],
):
self._scenario = scenario
self._required_arguments = required_arguments
# The results is a FIFO structure, implemented via a list
# (because the Queue lock is not pickable). Finished runs are
# put in this list and collected via _process_pending_runs
self._results_queue: list[tuple[TrialInfo, TrialValue]] = []
self._crash_cost = scenario.crash_cost
self._supports_memory_limit = False
if isinstance(scenario.objectives, str):
objectives = [scenario.objectives]
else:
objectives = scenario.objectives
self._objectives = objectives
self._n_objectives = scenario.count_objectives()
# We need to exapdn crash cost if the user did not do it
if self._n_objectives > 1:
if not isinstance(scenario.crash_cost, list):
assert isinstance(scenario.crash_cost, float)
self._crash_cost = [scenario.crash_cost for _ in range(self._n_objectives)]
[docs] def run_wrapper(self, trial_info: TrialInfo) -> tuple[TrialInfo, TrialValue]:
"""Wrapper around run() to execute and check the execution of a given config. This function encapsulates common
handling/processing, so that run() implementation is simplified.
Parameters
----------
trial_info : RunInfo
Object that contains enough information to execute a configuration run in isolation.
Returns
-------
info : TrialInfo
An object containing the configuration launched.
value : TrialValue
Contains information about the status/performance of config.
"""
start = time.time()
try:
status, cost, runtime, additional_info = self.run(
config=trial_info.config,
instance=trial_info.instance,
budget=trial_info.budget,
seed=trial_info.seed,
)
except Exception as e:
status = StatusType.CRASHED
cost = self._crash_cost
runtime = time.time() - start
# Add context information to the error message
exception_traceback = traceback.format_exc()
error_message = repr(e)
additional_info = {
"traceback": exception_traceback,
"error": error_message,
}
end = time.time()
if trial_info.budget == 0 and status == StatusType.DONOTADVANCE:
raise ValueError("Cannot handle DONOTADVANCE state when using intensify or SH/HB on instances.")
# Catch NaN or inf
if not np.all(np.isfinite(cost)):
logger.warning(
"Target function returned infinity or nothing at all. Result is treated as CRASHED"
f" and cost is set to {self._crash_cost}."
)
if "traceback" in additional_info:
logger.warning(f"Traceback: {additional_info['traceback']}\n")
status = StatusType.CRASHED
if status == StatusType.CRASHED:
cost = self._crash_cost
trial_value = TrialValue(
status=status,
cost=cost,
time=runtime,
additional_info=additional_info,
starttime=start,
endtime=end,
)
return trial_info, trial_value
@property
def meta(self) -> dict[str, Any]:
"""Returns the meta data of the created object."""
return {"name": self.__class__.__name__}
[docs] @abstractmethod
def submit_trial(self, trial_info: TrialInfo) -> None:
"""This function submits a configuration embedded in a TrialInfo object, and uses one of the
workers to produce a result (such result will eventually be available on the `self._results_queue`
FIFO).
This interface method will be called by SMBO, with the expectation
that a function will be executed by a worker. What will be executed is dictated by trial_info, and "how" will it
be executed is decided via the child class that implements a run() method.
Because config submission can be a serial/parallel endeavor, it is expected to be implemented by a child class.
Parameters
----------
trial_info : TrialInfo
An object containing the configuration launched.
"""
raise NotImplementedError
[docs] @abstractmethod
def run(
self,
config: Configuration,
instance: str | None = None,
budget: float | None = None,
seed: int | None = None,
) -> tuple[StatusType, float | list[float], float, dict]:
"""Runs the target function with a configuration on a single instance-budget-seed combination (aka trial).
Parameters
----------
config : Configuration
Configuration to be passed to the target function.
instance : str | None, defaults to None
The Problem instance.
budget : float | None, defaults to None
A positive, real-valued number representing an arbitrary limit to the target function handled by the
target function internally.
seed : int, defaults to None
Returns
-------
status : StatusType
Status of the trial.
cost : float | list[float]
Resulting cost(s) of the trial.
runtime : float
The time the target function function took to run.
additional_info : dict
All further additional trial information.
"""
raise NotImplementedError
[docs] @abstractmethod
def iter_results(self) -> Iterator[tuple[TrialInfo, TrialValue]]:
"""This method returns any finished configuration, and returns a list with the
results of exercising the configurations. This class keeps populating results
to ``self._results_queue`` until a call to ``get_finished`` trials is done. In this case,
the `self._results_queue` list is emptied and all trial values produced by running
`run` are returned.
Returns
-------
Iterator[tuple[TrialInfo, TrialValue]]:
A list of TrialInfo/TrialValue tuples, all of which have been finished.
"""
raise NotImplementedError
[docs] @abstractmethod
def wait(self) -> None:
"""The SMBO/intensifier might need to wait for trials to finish before making a decision."""
raise NotImplementedError
[docs] @abstractmethod
def is_running(self) -> bool:
"""Whether or not there are trials still running.
Generally, if the runner is serial, launching a trial instantly returns it's result. On
parallel runners, there might be pending configurations to complete.
"""
raise NotImplementedError
[docs] @abstractmethod
def count_available_workers(self) -> int:
"""Returns the number of available workers."""
raise NotImplementedError