Skip to content

Target function runner

smac.runner.target_function_runner #

TargetFunctionRunner #

TargetFunctionRunner(
    scenario: Scenario,
    target_function: Callable,
    required_arguments: list[str] = None,
)

Bases: AbstractSerialRunner

Class to execute target functions which are python functions. Evaluates function for given configuration and resource limit.

The target function can either return a float (the loss), or a tuple with the first element being a float and the second being additional run information. In a multi-objective setting, the float value is replaced by a list of floats.

PARAMETER DESCRIPTION
target_function

The target function.

TYPE: Callable

scenario

TYPE: Scenario

required_arguments

A list of required arguments, which are passed to the target function.

TYPE: list[str], defaults to [] DEFAULT: None

Source code in smac/runner/target_function_runner.py
def __init__(
    self,
    scenario: Scenario,
    target_function: Callable,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    super().__init__(scenario=scenario, required_arguments=required_arguments)
    self._target_function = target_function

    # Check if target function is callable
    if not callable(self._target_function):
        raise TypeError(
            "Argument `target_function` must be a callable but is type" f"`{type(self._target_function)}`."
        )

    # Signatures here
    signature = inspect.signature(self._target_function).parameters
    for argument in required_arguments:
        if argument not in signature.keys():
            raise RuntimeError(
                f"Target function needs to have the arguments {required_arguments} "
                f"but could not find {argument}."
            )

    # Now we check for additional arguments which are not used by SMAC
    # However, we only want to warn the user and not
    for key in list(signature.keys())[1:]:
        if key not in required_arguments:
            logger.warning(f"The argument {key} is not set by SMAC: Consider removing it from the target function.")

    # Pynisher limitations
    if (memory := self._scenario.trial_memory_limit) is not None:
        unit = None
        if isinstance(memory, (tuple, list)):
            memory, unit = memory
        memory = int(math.ceil(memory))
        if unit is not None:
            memory = (memory, unit)

    if (time := self._scenario.trial_walltime_limit) is not None:
        time = int(math.ceil(time))

    self._memory_limit = memory
    self._algorithm_walltime_limit = time

__call__ #

__call__(
    config: Configuration,
    algorithm: Callable,
    algorithm_kwargs: dict[str, Any],
) -> (
    float
    | list[float]
    | dict[str, float]
    | tuple[float, dict]
    | tuple[list[float], dict]
    | tuple[dict[str, float], dict]
)

Calls the algorithm, which is processed in the run method.

Source code in smac/runner/target_function_runner.py
def __call__(
    self,
    config: Configuration,
    algorithm: Callable,
    algorithm_kwargs: dict[str, Any],
) -> (
    float
    | list[float]
    | dict[str, float]
    | tuple[float, dict]
    | tuple[list[float], dict]
    | tuple[dict[str, float], dict]
):
    """Calls the algorithm, which is processed in the ``run`` method."""
    return algorithm(config, **algorithm_kwargs)

count_available_workers #

count_available_workers() -> int

Returns the number of available workers. Serial workers only have one worker.

Source code in smac/runner/abstract_serial_runner.py
def count_available_workers(self) -> int:
    """Returns the number of available workers. Serial workers only have one worker."""
    return 1

run #

run(
    config: Configuration,
    instance: Optional[str] = None,
    budget: Optional[float] = None,
    seed: Optional[int] = None,
    additional_info: Optional[dict[str, Any]] = None,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[
    StatusType,
    Union[float, list[float]],
    float,
    float,
    dict[str, Any],
]

Calls the target function with pynisher if algorithm wall time limit or memory limit is set. Otherwise, the function is called directly.

PARAMETER DESCRIPTION
config

Configuration to be passed to the target function.

TYPE: Configuration

instance

The Problem instance.

TYPE: str | None, defaults to None DEFAULT: None

budget

A positive, real-valued number representing an arbitrary limit to the target function handled by the target function internally.

TYPE: float | None, defaults to None DEFAULT: None

seed

TYPE: int, defaults to None DEFAULT: None

additional_info

Additional information to be passed to the target function.

TYPE: dict[str, Any] | None, defaults to None DEFAULT: None

dask_data_to_scatter

This kwargs must be empty when we do not use dask! () When a user scatters data from their local process to the distributed network, this data is distributed in a round-robin fashion grouping by number of cores. Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data every time we would like to execute a target function with a big dataset. For example, when your target function has a big dataset shared across all the target function, this argument is very useful.

TYPE: dict[str, Any] DEFAULT: {}

RETURNS DESCRIPTION
status

Status of the trial.

TYPE: StatusType

cost

Resulting cost(s) of the trial.

TYPE: float | list[float]

runtime

The time the target function took to run.

TYPE: float

cpu_time

The time the target function took on the hardware to run.

TYPE: float

val_additional_info

All further additional trial information.

TYPE: dict

Source code in smac/runner/target_function_runner.py
def run(
    self,
    config: Configuration,
    instance: Optional[str] = None,
    budget: Optional[float] = None,
    seed: Optional[int] = None,
    additional_info: Optional[dict[str, Any]] = None,
    **dask_data_to_scatter: dict[str, Any],
) -> tuple[StatusType, Union[float, list[float]], float, float, dict[str, Any]]:
    """Calls the target function with pynisher if algorithm wall time limit or memory limit is
    set. Otherwise, the function is called directly.

    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None
    additional_info : dict[str, Any] | None, defaults to None
        Additional information to be passed to the target function.
    dask_data_to_scatter: dict[str, Any]
        This kwargs must be empty when we do not use dask! ()
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on the hardware to run.
    val_additional_info : dict
        All further additional trial information.
    """
    # The kwargs are passed to the target function.
    kwargs: dict[str, Any] = {}
    kwargs.update(dask_data_to_scatter)

    if "seed" in self._required_arguments:
        kwargs["seed"] = seed

    if "instance" in self._required_arguments:
        kwargs["instance"] = instance

    if "budget" in self._required_arguments:
        kwargs["budget"] = budget

    if "cutoff" in self._required_arguments and additional_info is not None:
        kwargs["cutoff"] = additional_info["cutoff"]

    # Presetting
    cost: float | list[float] = self._crash_cost
    runtime = 0.0
    cpu_time = runtime
    val_additional_info = {}
    status = StatusType.CRASHED

    # If memory limit or walltime limit is set, we wanna use pynisher
    target_function: Callable
    if self._memory_limit is not None or self._algorithm_walltime_limit is not None:
        target_function = limit(
            self._target_function,
            memory=self._memory_limit,
            wall_time=self._algorithm_walltime_limit,
            wrap_errors=True,  # Hard to describe; see https://github.com/automl/pynisher
        )
    else:
        target_function = self._target_function

    # We don't want the user to change the configuration
    config_copy = copy.deepcopy(config)

    # Call target function
    try:
        start_time = time.time()
        cpu_time = time.process_time()
        rval = self(config_copy, target_function, kwargs)
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start_time
        status = StatusType.SUCCESS
    except WallTimeoutException:
        status = StatusType.TIMEOUT
    except MemoryLimitException:
        status = StatusType.MEMORYOUT
    except Exception as e:
        cost = np.asarray(cost).squeeze().tolist()
        val_additional_info = {
            "traceback": traceback.format_exc(),
            "error": repr(e),
        }
        status = StatusType.CRASHED

    if status != StatusType.SUCCESS:
        return status, cost, runtime, cpu_time, val_additional_info

    if isinstance(rval, tuple):
        result, val_additional_info = rval
    else:
        result, val_additional_info = rval, {}

    # Do some sanity checking (for multi objective)
    error = f"Returned costs {result} does not match the number of objectives {self._objectives}."

    # If dict convert to array and make sure the order is correct
    if isinstance(result, dict):
        if len(result) != len(self._objectives):
            raise RuntimeError(error)

        ordered_cost: list[float] = []
        for name in self._objectives:
            if name not in result:
                raise RuntimeError(f"Objective {name} was not found in the returned costs.")  # noqa: E713

            ordered_cost.append(result[name])

        result = ordered_cost

    if isinstance(result, list):
        if len(result) != len(self._objectives):
            raise RuntimeError(error)

    if isinstance(result, float):
        if isinstance(self._objectives, list) and len(self._objectives) != 1:
            raise RuntimeError(error)

    cost = result

    if cost is None:
        status = StatusType.CRASHED
        cost = self._crash_cost

    # We want to get either a float or a list of floats.
    cost = np.asarray(cost).squeeze().tolist()

    return status, cost, runtime, cpu_time, val_additional_info

run_wrapper #

run_wrapper(
    trial_info: TrialInfo,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]

Wrapper around run() to execute and check the execution of a given config. This function encapsulates common handling/processing, so that run() implementation is simplified.

PARAMETER DESCRIPTION
trial_info

Object that contains enough information to execute a configuration run in isolation.

TYPE: RunInfo

dask_data_to_scatter

When a user scatters data from their local process to the distributed network, this data is distributed in a round-robin fashion grouping by number of cores. Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data every time we would like to execute a target function with a big dataset. For example, when your target function has a big dataset shared across all the target function, this argument is very useful.

TYPE: dict[str, Any] DEFAULT: {}

RETURNS DESCRIPTION
info

An object containing the configuration launched.

TYPE: TrialInfo

value

Contains information about the status/performance of config.

TYPE: TrialValue

Source code in smac/runner/abstract_runner.py
def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.

    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            additional_info=trial_info.additional_info,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start

        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }

    end = time.time()

    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )

        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")

        status = StatusType.CRASHED

    if status == StatusType.CRASHED:
        cost = self._crash_cost

    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )

    return trial_info, trial_value

submit_trial #

submit_trial(trial_info: TrialInfo) -> None

This function submits a trial_info object in a serial fashion. As there is a single worker for this task, this interface can be considered a wrapper over the run method.

Both result/exceptions can be completely determined in this step so both lists are properly filled.

PARAMETER DESCRIPTION
trial_info

An object containing the configuration launched.

TYPE: TrialInfo

Source code in smac/runner/abstract_serial_runner.py
def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a trial_info object in a serial fashion. As there is a single
     worker for this task, this interface can be considered a wrapper over the `run` method.

    Both result/exceptions can be completely determined in this step so both lists
    are properly filled.

    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    self._results_queue.append(self.run_wrapper(trial_info))

wait #

wait() -> None

The SMBO/intensifier might need to wait for trials to finish before making a decision. For serial runners, no wait is needed as the result is immediately available.

Source code in smac/runner/abstract_serial_runner.py
def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision.
    For serial runners, no wait is needed as the result is immediately available.
    """
    # There is no need to wait in serial runners. When launching a trial via submit, as
    # the serial trial uses the same process to run, the result is always available
    # immediately after. This method implements is just an implementation of the
    # abstract method via a simple return, again, because there is no need to wait
    return