Bases: AbstractSerialRunner
Class to execute target functions from scripts. Uses Popen
to execute the script in a
subprocess.
The following example shows how the script is called:
target_function --instance=test --instance_features=test --seed=0 --hyperparameter1=5323
The script must return an echo in the following form (white-spaces are removed):
cost=0.5; runtime=0.01; status=SUCCESS; additional_info=test
(single-objective)
cost=0.5, 0.4; runtime=0.01; status=SUCCESS; additional_info=test
(multi-objective)
The status must be a string and must be one of the StatusType
values. However, runtime
,
status
and additional_info
are optional.
Note
Everytime an instance is passed, also an instance feature in form of a comma-separated list
(no spaces) of floats is passed. If no instance feature for the instance is given,
an empty list is passed.
Parameters
target_function : Callable
The target function.
scenario : Scenario
required_arguments : list[str]
A list of required arguments, which are passed to the target function.
Source code in smac/runner/target_function_script_runner.py
| def __init__(
self,
target_function: str,
scenario: Scenario,
required_arguments: list[str] = None,
):
if required_arguments is None:
required_arguments = []
super().__init__(scenario=scenario, required_arguments=required_arguments)
self._target_function = target_function
# Check if target function is callable
if not isinstance(self._target_function, str):
raise TypeError(
"Argument `target_function` must be a string but is type" f"`{type(self._target_function)}`."
)
if self._scenario.trial_memory_limit is not None:
logger.warning("Trial memory limit is not supported for script target functions.")
if self._scenario.trial_walltime_limit is not None:
logger.warning("Trial walltime limit is not supported for script target functions.")
|
__call__
Calls the algorithm, which is processed in the run
method.
Source code in smac/runner/target_function_script_runner.py
| def __call__(
self,
algorithm_kwargs: dict[str, Any],
) -> tuple[str, str]:
"""Calls the algorithm, which is processed in the ``run`` method."""
cmd = [self._target_function]
for k, v in algorithm_kwargs.items():
v = str(v)
k = str(k)
# Let's remove some spaces
v = v.replace(" ", "")
cmd += [f"--{k}={v}"]
logger.debug(f"Calling: {' '.join(cmd)}")
p = Popen(cmd, shell=False, stdout=PIPE, stderr=PIPE, universal_newlines=True)
output, error = p.communicate()
logger.debug("Stdout: %s" % output)
logger.debug("Stderr: %s" % error)
return output, error
|
count_available_workers
count_available_workers() -> int
Returns the number of available workers. Serial workers only have one worker.
Source code in smac/runner/abstract_serial_runner.py
| def count_available_workers(self) -> int:
"""Returns the number of available workers. Serial workers only have one worker."""
return 1
|
run
Calls the target function.
Parameters
config : Configuration
Configuration to be passed to the target function.
instance : str | None, defaults to None
The Problem instance.
budget : float | None, defaults to None
A positive, real-valued number representing an arbitrary limit to the target function
handled by the target function internally.
seed : int, defaults to None
Returns
status : StatusType
Status of the trial.
cost : float | list[float]
Resulting cost(s) of the trial.
runtime : float
The time the target function took to run.
cpu_time : float
The time the target function took on the hardware to run.
additional_info : dict
All further additional trial information.
Source code in smac/runner/target_function_script_runner.py
| def run(
self,
config: Configuration,
instance: str | None = None,
budget: float | None = None,
seed: int | None = None,
) -> tuple[StatusType, float | list[float], float, float, dict]:
"""Calls the target function.
Parameters
----------
config : Configuration
Configuration to be passed to the target function.
instance : str | None, defaults to None
The Problem instance.
budget : float | None, defaults to None
A positive, real-valued number representing an arbitrary limit to the target function
handled by the target function internally.
seed : int, defaults to None
Returns
-------
status : StatusType
Status of the trial.
cost : float | list[float]
Resulting cost(s) of the trial.
runtime : float
The time the target function took to run.
cpu_time : float
The time the target function took on the hardware to run.
additional_info : dict
All further additional trial information.
"""
# The kwargs are passed to the target function.
kwargs: dict[str, Any] = {}
if "seed" in self._required_arguments:
kwargs["seed"] = seed
if "instance" in self._required_arguments:
kwargs["instance"] = instance
# In contrast to the normal target function runner, we also add the instance features here.
if self._scenario.instance_features is not None and instance in self._scenario.instance_features:
kwargs["instance_features"] = self._scenario.instance_features[instance]
else:
kwargs["instance_features"] = []
if "budget" in self._required_arguments:
kwargs["budget"] = budget
# Presetting
cost: float | list[float] = self._crash_cost
runtime = 0.0
cpu_time = runtime
additional_info = {}
status = StatusType.SUCCESS
# Add config arguments to the kwargs
for k, v in dict(config).items():
if k in kwargs:
raise RuntimeError(f"The key {k} is already in use. Please use a different one.")
kwargs[k] = v
# Call target function
start_time = time.time()
cpu_time = time.process_time()
output, error = self(kwargs)
cpu_time = time.process_time() - cpu_time
runtime = time.time() - start_time
# Now we have to parse the std output
# First remove white-spaces
output = output.replace(" ", "")
outputs = {}
for pair in output.split(";"):
try:
kv = pair.split("=")
k, v = kv[0], kv[1]
# Get rid of the trailing newline
v = v.strip()
outputs[k] = v
except Exception:
pass
# Parse status
if "status" in outputs:
status = getattr(StatusType, outputs["status"])
# Parse costs (depends on the number of objectives)
if "cost" in outputs:
if self._n_objectives == 1:
cost = float(outputs["cost"])
else:
costs = outputs["cost"].split(",")
costs = [float(c) for c in costs]
cost = costs
if len(costs) != self._n_objectives:
raise RuntimeError("The number of costs does not match the number of objectives.")
else:
status = StatusType.CRASHED
# Overwrite runtime
if "runtime" in outputs:
runtime = float(outputs["runtime"])
# Overwrite CPU time
if "cpu_time" in outputs:
cpu_time = float(outputs["cpu_time"])
# Add additional info
if "additional_info" in outputs:
additional_info["additional_info"] = outputs["additional_info"]
if status != StatusType.SUCCESS:
additional_info["error"] = error
if cost != self._crash_cost:
cost = self._crash_cost
logger.info(
"The target function crashed but returned a cost. The cost is ignored and replaced by crash cost."
)
return status, cost, runtime, cpu_time, additional_info
|
run_wrapper
Wrapper around run() to execute and check the execution of a given config.
This function encapsulates common
handling/processing, so that run() implementation is simplified.
Parameters
trial_info : RunInfo
Object that contains enough information to execute a configuration run in isolation.
dask_data_to_scatter: dict[str, Any]
When a user scatters data from their local process to the distributed network,
this data is distributed in a round-robin fashion grouping by number of cores.
Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
every time we would like to execute a target function with a big dataset.
For example, when your target function has a big dataset shared across all the target function,
this argument is very useful.
Returns
info : TrialInfo
An object containing the configuration launched.
value : TrialValue
Contains information about the status/performance of config.
Source code in smac/runner/abstract_runner.py
| def run_wrapper(
self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
"""Wrapper around run() to execute and check the execution of a given config.
This function encapsulates common
handling/processing, so that run() implementation is simplified.
Parameters
----------
trial_info : RunInfo
Object that contains enough information to execute a configuration run in isolation.
dask_data_to_scatter: dict[str, Any]
When a user scatters data from their local process to the distributed network,
this data is distributed in a round-robin fashion grouping by number of cores.
Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
every time we would like to execute a target function with a big dataset.
For example, when your target function has a big dataset shared across all the target function,
this argument is very useful.
Returns
-------
info : TrialInfo
An object containing the configuration launched.
value : TrialValue
Contains information about the status/performance of config.
"""
start = time.time()
cpu_time = time.process_time()
try:
status, cost, runtime, cpu_time, additional_info = self.run(
config=trial_info.config,
instance=trial_info.instance,
budget=trial_info.budget,
seed=trial_info.seed,
**dask_data_to_scatter,
)
except Exception as e:
status = StatusType.CRASHED
cost = self._crash_cost
cpu_time = time.process_time() - cpu_time
runtime = time.time() - start
# Add context information to the error message
exception_traceback = traceback.format_exc()
error_message = repr(e)
additional_info = {
"traceback": exception_traceback,
"error": error_message,
}
end = time.time()
# Catch NaN or inf
if not np.all(np.isfinite(cost)):
logger.warning(
"Target function returned infinity or nothing at all. Result is treated as CRASHED"
f" and cost is set to {self._crash_cost}."
)
if "traceback" in additional_info:
logger.warning(f"Traceback: {additional_info['traceback']}\n")
status = StatusType.CRASHED
if status == StatusType.CRASHED:
cost = self._crash_cost
trial_value = TrialValue(
status=status,
cost=cost,
time=runtime,
cpu_time=cpu_time,
additional_info=additional_info,
starttime=start,
endtime=end,
)
return trial_info, trial_value
|
submit_trial
This function submits a trial_info object in a serial fashion. As there is a single
worker for this task, this interface can be considered a wrapper over the run
method.
Both result/exceptions can be completely determined in this step so both lists
are properly filled.
Parameters
trial_info : TrialInfo
An object containing the configuration launched.
Source code in smac/runner/abstract_serial_runner.py
| def submit_trial(self, trial_info: TrialInfo) -> None:
"""This function submits a trial_info object in a serial fashion. As there is a single
worker for this task, this interface can be considered a wrapper over the `run` method.
Both result/exceptions can be completely determined in this step so both lists
are properly filled.
Parameters
----------
trial_info : TrialInfo
An object containing the configuration launched.
"""
self._results_queue.append(self.run_wrapper(trial_info))
|
wait
The SMBO/intensifier might need to wait for trials to finish before making a decision.
For serial runners, no wait is needed as the result is immediately available.
Source code in smac/runner/abstract_serial_runner.py
| def wait(self) -> None:
"""The SMBO/intensifier might need to wait for trials to finish before making a decision.
For serial runners, no wait is needed as the result is immediately available.
"""
# There is no need to wait in serial runners. When launching a trial via submit, as
# the serial trial uses the same process to run, the result is always available
# immediately after. This method implements is just an implementation of the
# abstract method via a simple return, again, because there is no need to wait
return
|