Skip to content

Target function script runner

smac.runner.target_function_script_runner #

TargetFunctionScriptRunner #

TargetFunctionScriptRunner(
    target_function: str,
    scenario: Scenario,
    required_arguments: list[str] = None,
)

Bases: AbstractSerialRunner

Class to execute target functions from scripts. Uses Popen to execute the script in a subprocess.

The following example shows how the script is called: target_function --instance=test --instance_features=test --seed=0 --hyperparameter1=5323

The script must return an echo in the following form (white-spaces are removed): cost=0.5; runtime=0.01; status=SUCCESS; additional_info=test (single-objective) cost=0.5, 0.4; runtime=0.01; status=SUCCESS; additional_info=test (multi-objective)

The status must be a string and must be one of the StatusType values. However, runtime, status and additional_info are optional.

Note#

Everytime an instance is passed, also an instance feature in form of a comma-separated list (no spaces) of floats is passed. If no instance feature for the instance is given, an empty list is passed.

Parameters#

target_function : Callable The target function. scenario : Scenario required_arguments : list[str] A list of required arguments, which are passed to the target function.

Source code in smac/runner/target_function_script_runner.py
def __init__(
    self,
    target_function: str,
    scenario: Scenario,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    super().__init__(scenario=scenario, required_arguments=required_arguments)
    self._target_function = target_function

    # Check if target function is callable
    if not isinstance(self._target_function, str):
        raise TypeError(
            "Argument `target_function` must be a string but is type" f"`{type(self._target_function)}`."
        )

    if self._scenario.trial_memory_limit is not None:
        logger.warning("Trial memory limit is not supported for script target functions.")

    if self._scenario.trial_walltime_limit is not None:
        logger.warning("Trial walltime limit is not supported for script target functions.")

__call__ #

__call__(
    algorithm_kwargs: dict[str, Any]
) -> tuple[str, str]

Calls the algorithm, which is processed in the run method.

Source code in smac/runner/target_function_script_runner.py
def __call__(
    self,
    algorithm_kwargs: dict[str, Any],
) -> tuple[str, str]:
    """Calls the algorithm, which is processed in the ``run`` method."""
    cmd = [self._target_function]
    for k, v in algorithm_kwargs.items():
        v = str(v)
        k = str(k)

        # Let's remove some spaces
        v = v.replace(" ", "")

        cmd += [f"--{k}={v}"]

    logger.debug(f"Calling: {' '.join(cmd)}")
    p = Popen(cmd, shell=False, stdout=PIPE, stderr=PIPE, universal_newlines=True)
    output, error = p.communicate()

    logger.debug("Stdout: %s" % output)
    logger.debug("Stderr: %s" % error)

    return output, error

count_available_workers #

count_available_workers() -> int

Returns the number of available workers. Serial workers only have one worker.

Source code in smac/runner/abstract_serial_runner.py
def count_available_workers(self) -> int:
    """Returns the number of available workers. Serial workers only have one worker."""
    return 1

run #

run(
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[
    StatusType, float | list[float], float, float, dict
]

Calls the target function.

Parameters#

config : Configuration Configuration to be passed to the target function. instance : str | None, defaults to None The Problem instance. budget : float | None, defaults to None A positive, real-valued number representing an arbitrary limit to the target function handled by the target function internally. seed : int, defaults to None

Returns#

status : StatusType Status of the trial. cost : float | list[float] Resulting cost(s) of the trial. runtime : float The time the target function took to run. cpu_time : float The time the target function took on the hardware to run. additional_info : dict All further additional trial information.

Source code in smac/runner/target_function_script_runner.py
def run(
    self,
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[StatusType, float | list[float], float, float, dict]:
    """Calls the target function.

    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None

    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on the hardware to run.
    additional_info : dict
        All further additional trial information.
    """
    # The kwargs are passed to the target function.
    kwargs: dict[str, Any] = {}
    if "seed" in self._required_arguments:
        kwargs["seed"] = seed

    if "instance" in self._required_arguments:
        kwargs["instance"] = instance

        # In contrast to the normal target function runner, we also add the instance features here.
        if self._scenario.instance_features is not None and instance in self._scenario.instance_features:
            kwargs["instance_features"] = self._scenario.instance_features[instance]
        else:
            kwargs["instance_features"] = []

    if "budget" in self._required_arguments:
        kwargs["budget"] = budget

    # Presetting
    cost: float | list[float] = self._crash_cost
    runtime = 0.0
    cpu_time = runtime
    additional_info = {}
    status = StatusType.SUCCESS

    # Add config arguments to the kwargs
    for k, v in dict(config).items():
        if k in kwargs:
            raise RuntimeError(f"The key {k} is already in use. Please use a different one.")
        kwargs[k] = v

    # Call target function
    start_time = time.time()
    cpu_time = time.process_time()
    output, error = self(kwargs)
    cpu_time = time.process_time() - cpu_time
    runtime = time.time() - start_time

    # Now we have to parse the std output
    # First remove white-spaces
    output = output.replace(" ", "")

    outputs = {}
    for pair in output.split(";"):
        try:
            kv = pair.split("=")
            k, v = kv[0], kv[1]

            # Get rid of the trailing newline
            v = v.strip()

            outputs[k] = v
        except Exception:
            pass

    # Parse status
    if "status" in outputs:
        status = getattr(StatusType, outputs["status"])

    # Parse costs (depends on the number of objectives)
    if "cost" in outputs:
        if self._n_objectives == 1:
            cost = float(outputs["cost"])
        else:
            costs = outputs["cost"].split(",")
            costs = [float(c) for c in costs]
            cost = costs

            if len(costs) != self._n_objectives:
                raise RuntimeError("The number of costs does not match the number of objectives.")
    else:
        status = StatusType.CRASHED

    # Overwrite runtime
    if "runtime" in outputs:
        runtime = float(outputs["runtime"])

    # Overwrite CPU time
    if "cpu_time" in outputs:
        cpu_time = float(outputs["cpu_time"])

    # Add additional info
    if "additional_info" in outputs:
        additional_info["additional_info"] = outputs["additional_info"]

    if status != StatusType.SUCCESS:
        additional_info["error"] = error

        if cost != self._crash_cost:
            cost = self._crash_cost
            logger.info(
                "The target function crashed but returned a cost. The cost is ignored and replaced by crash cost."
            )

    return status, cost, runtime, cpu_time, additional_info

run_wrapper #

run_wrapper(
    trial_info: TrialInfo,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]

Wrapper around run() to execute and check the execution of a given config. This function encapsulates common handling/processing, so that run() implementation is simplified.

Parameters#

trial_info : RunInfo Object that contains enough information to execute a configuration run in isolation. dask_data_to_scatter: dict[str, Any] When a user scatters data from their local process to the distributed network, this data is distributed in a round-robin fashion grouping by number of cores. Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data every time we would like to execute a target function with a big dataset. For example, when your target function has a big dataset shared across all the target function, this argument is very useful.

Returns#

info : TrialInfo An object containing the configuration launched. value : TrialValue Contains information about the status/performance of config.

Source code in smac/runner/abstract_runner.py
def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.

    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start

        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }

    end = time.time()

    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )

        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")

        status = StatusType.CRASHED

    if status == StatusType.CRASHED:
        cost = self._crash_cost

    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )

    return trial_info, trial_value

submit_trial #

submit_trial(trial_info: TrialInfo) -> None

This function submits a trial_info object in a serial fashion. As there is a single worker for this task, this interface can be considered a wrapper over the run method.

Both result/exceptions can be completely determined in this step so both lists are properly filled.

Parameters#

trial_info : TrialInfo An object containing the configuration launched.

Source code in smac/runner/abstract_serial_runner.py
def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a trial_info object in a serial fashion. As there is a single
     worker for this task, this interface can be considered a wrapper over the `run` method.

    Both result/exceptions can be completely determined in this step so both lists
    are properly filled.

    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    self._results_queue.append(self.run_wrapper(trial_info))

wait #

wait() -> None

The SMBO/intensifier might need to wait for trials to finish before making a decision. For serial runners, no wait is needed as the result is immediately available.

Source code in smac/runner/abstract_serial_runner.py
def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision.
    For serial runners, no wait is needed as the result is immediately available.
    """
    # There is no need to wait in serial runners. When launching a trial via submit, as
    # the serial trial uses the same process to run, the result is always available
    # immediately after. This method implements is just an implementation of the
    # abstract method via a simple return, again, because there is no need to wait
    return