from typing import Callable, Dict, List, Optional, Tuple, Union
from smac.configspace import Configuration
from smac.runhistory.runhistory import RunInfo, RunValue
from smac.stats.stats import Stats
from smac.tae import StatusType
from smac.tae.base import BaseRunner
from smac.utils.constants import MAXINT
__copyright__ = "Copyright 2021, AutoML.org Freiburg-Hannover"
__license__ = "3-clause BSD"
[docs]class SerialRunner(BaseRunner):
"""Interface to submit and collect a job in a serial fashion.
It dictates what a worker should do to convert a
configuration/instance/seed to a result.
This class is expected to be extended via the implementation of
a run() method for the desired task.
Attributes
----------
results
ta
stats
run_obj
par_factor
cost_for_crash
abort_i_first_run_crash
Parameters
----------
ta : list
target algorithm command line as list of arguments
stats: Stats()
stats object to collect statistics about runtime and so on
multi_objectives: List[str]
names of the objectives, by default it is a single objective parameter "cost"
run_obj: str
run objective of SMAC
par_factor: int
penalization factor
cost_for_crash : float
cost that is used in case of crashed runs (including runs
that returned NaN or inf)
abort_on_first_run_crash: bool
if true and first run crashes, raise FirstRunCrashedException
"""
def __init__(
self,
ta: Union[List[str], Callable],
stats: Stats,
multi_objectives: List[str] = ["cost"],
run_obj: str = "runtime",
par_factor: int = 1,
cost_for_crash: Union[float, List[float]] = float(MAXINT),
abort_on_first_run_crash: bool = True,
):
super(SerialRunner, self).__init__(
ta=ta,
stats=stats,
multi_objectives=multi_objectives,
run_obj=run_obj,
par_factor=par_factor,
cost_for_crash=cost_for_crash,
abort_on_first_run_crash=abort_on_first_run_crash,
)
[docs] def submit_run(self, run_info: RunInfo) -> None:
"""This function submits a run_info object in a serial fashion.
As there is a single worker for this task, this
interface can be considered a wrapper over the run()
method.
Both result/exceptions can be completely determined in this
step so both lists are properly filled.
Parameters
----------
run_info: RunInfo
An object containing the configuration and the necessary data to run it
"""
self.results.append(self.run_wrapper(run_info))
[docs] def get_finished_runs(self) -> List[Tuple[RunInfo, RunValue]]:
"""This method returns any finished configuration, and returns a list with the results of
exercising the configurations. This class keeps populating results to self.results until a
call to get_finished runs is done. In this case, the self.results list is emptied and all
RunValues produced by running self.run() are returned.
Returns
-------
List[RunInfo, RunValue]: A list of RunInfo/RunValues pairs
a submitted configuration
"""
results_list = []
while self.results:
results_list.append(self.results.pop())
return results_list
[docs] def wait(self) -> None:
"""SMBO/intensifier might need to wait for runs to finish before making a decision.
For serial runs, no wait is needed as the result is immediately available.
"""
# There is no need to wait in serial runs.
# When launching a run via submit, as the serial run
# uses the same process to run, the result is always available
# immediately after. This method implements is just an implementation of the
# abstract method via a simple return, again, because there is
# no need to wait (as in distributed runs)
return
[docs] def pending_runs(self) -> bool:
"""Whether or not there are configs still running.
Generally if the runner is serial, launching a run instantly returns it's result. On
parallel runners, there might be pending configurations to complete.
"""
# No pending runs in a serial run. Execution is blocking
return False
[docs] def run(
self,
config: Configuration,
instance: str,
cutoff: Optional[float] = None,
seed: int = 12345,
budget: Optional[float] = None,
instance_specific: str = "0",
) -> Tuple[StatusType, float, float, Dict]:
"""Runs target algorithm <self.ta> with configuration <config> on instance <instance> with
instance specifics.
<specifics> for at most.
<cutoff> seconds and random seed <seed>
This method exemplifies how to defined the run() method
Parameters
----------
config : Configuration
dictionary param -> value
instance : string
problem instance
cutoff : float, optional
Wallclock time limit of the target algorithm. If no value is
provided no limit will be enforced.
seed : int
random seed
budget : float, optional
A positive, real-valued number representing an arbitrary limit to the target
algorithm. Handled by the target algorithm internally
instance_specific: str
instance specific information (e.g., domain file or solution)
Returns
-------
status: enum of StatusType (int)
{SUCCESS, TIMEOUT, CRASHED, ABORT}
cost: float
cost/regret/quality (float) (None, if not returned by TA)
runtime: float
runtime (None if not returned by TA)
additional_info: dict
all further additional run information
"""
pass
[docs] def num_workers(self) -> int:
"""Total number of workers available."""
# Any serial runner supports only 1 worker
return 1