Skip to content

Target function runner

smac.runner.target_function_runner #

TargetFunctionRunner #

TargetFunctionRunner(
    scenario: Scenario,
    target_function: Callable,
    required_arguments: list[str] = None,
)

Bases: AbstractSerialRunner

Class to execute target functions which are python functions. Evaluates function for given configuration and resource limit.

The target function can either return a float (the loss), or a tuple with the first element being a float and the second being additional run information. In a multi-objective setting, the float value is replaced by a list of floats.

Parameters#

target_function : Callable The target function. scenario : Scenario required_arguments : list[str], defaults to [] A list of required arguments, which are passed to the target function.

Source code in smac/runner/target_function_runner.py
def __init__(
    self,
    scenario: Scenario,
    target_function: Callable,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    super().__init__(scenario=scenario, required_arguments=required_arguments)
    self._target_function = target_function

    # Check if target function is callable
    if not callable(self._target_function):
        raise TypeError(
            "Argument `target_function` must be a callable but is type" f"`{type(self._target_function)}`."
        )

    # Signatures here
    signature = inspect.signature(self._target_function).parameters
    for argument in required_arguments:
        if argument not in signature.keys():
            raise RuntimeError(
                f"Target function needs to have the arguments {required_arguments} "
                f"but could not find {argument}."
            )

    # Now we check for additional arguments which are not used by SMAC
    # However, we only want to warn the user and not
    for key in list(signature.keys())[1:]:
        if key not in required_arguments:
            logger.warning(f"The argument {key} is not set by SMAC: Consider removing it from the target function.")

    # Pynisher limitations
    if (memory := self._scenario.trial_memory_limit) is not None:
        unit = None
        if isinstance(memory, (tuple, list)):
            memory, unit = memory
        memory = int(math.ceil(memory))
        if unit is not None:
            memory = (memory, unit)

    if (time := self._scenario.trial_walltime_limit) is not None:
        time = int(math.ceil(time))

    self._memory_limit = memory
    self._algorithm_walltime_limit = time

__call__ #

__call__(
    config: Configuration,
    algorithm: Callable,
    algorithm_kwargs: dict[str, Any],
) -> (
    float
    | list[float]
    | dict[str, float]
    | tuple[float, dict]
    | tuple[list[float], dict]
    | tuple[dict[str, float], dict]
)

Calls the algorithm, which is processed in the run method.

Source code in smac/runner/target_function_runner.py
def __call__(
    self,
    config: Configuration,
    algorithm: Callable,
    algorithm_kwargs: dict[str, Any],
) -> (
    float
    | list[float]
    | dict[str, float]
    | tuple[float, dict]
    | tuple[list[float], dict]
    | tuple[dict[str, float], dict]
):
    """Calls the algorithm, which is processed in the ``run`` method."""
    return algorithm(config, **algorithm_kwargs)

count_available_workers #

count_available_workers() -> int

Returns the number of available workers. Serial workers only have one worker.

Source code in smac/runner/abstract_serial_runner.py
def count_available_workers(self) -> int:
    """Returns the number of available workers. Serial workers only have one worker."""
    return 1

run #

run(
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[
    StatusType, float | list[float], float, float, dict
]

Calls the target function with pynisher if algorithm wall time limit or memory limit is set. Otherwise, the function is called directly.

Parameters#

config : Configuration Configuration to be passed to the target function. instance : str | None, defaults to None The Problem instance. budget : float | None, defaults to None A positive, real-valued number representing an arbitrary limit to the target function handled by the target function internally. seed : int, defaults to None dask_data_to_scatter: dict[str, Any] This kwargs must be empty when we do not use dask! () When a user scatters data from their local process to the distributed network, this data is distributed in a round-robin fashion grouping by number of cores. Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data every time we would like to execute a target function with a big dataset. For example, when your target function has a big dataset shared across all the target function, this argument is very useful.

Returns#

status : StatusType Status of the trial. cost : float | list[float] Resulting cost(s) of the trial. runtime : float The time the target function took to run. cpu_time : float The time the target function took on the hardware to run. additional_info : dict All further additional trial information.

Source code in smac/runner/target_function_runner.py
def run(
    self,
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
    **dask_data_to_scatter: dict[str, Any],
) -> tuple[StatusType, float | list[float], float, float, dict]:
    """Calls the target function with pynisher if algorithm wall time limit or memory limit is
    set. Otherwise, the function is called directly.

    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None
    dask_data_to_scatter: dict[str, Any]
        This kwargs must be empty when we do not use dask! ()
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on the hardware to run.
    additional_info : dict
        All further additional trial information.
    """
    # The kwargs are passed to the target function.
    kwargs: dict[str, Any] = {}
    kwargs.update(dask_data_to_scatter)

    if "seed" in self._required_arguments:
        kwargs["seed"] = seed

    if "instance" in self._required_arguments:
        kwargs["instance"] = instance

    if "budget" in self._required_arguments:
        kwargs["budget"] = budget

    # Presetting
    cost: float | list[float] = self._crash_cost
    runtime = 0.0
    cpu_time = runtime
    additional_info = {}
    status = StatusType.CRASHED

    # If memory limit or walltime limit is set, we wanna use pynisher
    target_function: Callable
    if self._memory_limit is not None or self._algorithm_walltime_limit is not None:
        target_function = limit(
            self._target_function,
            memory=self._memory_limit,
            wall_time=self._algorithm_walltime_limit,
            wrap_errors=True,  # Hard to describe; see https://github.com/automl/pynisher
        )
    else:
        target_function = self._target_function

    # We don't want the user to change the configuration
    config_copy = copy.deepcopy(config)

    # Call target function
    try:
        start_time = time.time()
        cpu_time = time.process_time()
        rval = self(config_copy, target_function, kwargs)
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start_time
        status = StatusType.SUCCESS
    except WallTimeoutException:
        status = StatusType.TIMEOUT
    except MemoryLimitException:
        status = StatusType.MEMORYOUT
    except Exception as e:
        cost = np.asarray(cost).squeeze().tolist()
        additional_info = {
            "traceback": traceback.format_exc(),
            "error": repr(e),
        }
        status = StatusType.CRASHED

    if status != StatusType.SUCCESS:
        return status, cost, runtime, cpu_time, additional_info

    if isinstance(rval, tuple):
        result, additional_info = rval
    else:
        result, additional_info = rval, {}

    # Do some sanity checking (for multi objective)
    error = f"Returned costs {result} does not match the number of objectives {self._objectives}."

    # If dict convert to array and make sure the order is correct
    if isinstance(result, dict):
        if len(result) != len(self._objectives):
            raise RuntimeError(error)

        ordered_cost: list[float] = []
        for name in self._objectives:
            if name not in result:
                raise RuntimeError(f"Objective {name} was not found in the returned costs.")  # noqa: E713

            ordered_cost.append(result[name])

        result = ordered_cost

    if isinstance(result, list):
        if len(result) != len(self._objectives):
            raise RuntimeError(error)

    if isinstance(result, float):
        if isinstance(self._objectives, list) and len(self._objectives) != 1:
            raise RuntimeError(error)

    cost = result

    if cost is None:
        status = StatusType.CRASHED
        cost = self._crash_cost

    # We want to get either a float or a list of floats.
    cost = np.asarray(cost).squeeze().tolist()

    return status, cost, runtime, cpu_time, additional_info

run_wrapper #

run_wrapper(
    trial_info: TrialInfo,
    **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]

Wrapper around run() to execute and check the execution of a given config. This function encapsulates common handling/processing, so that run() implementation is simplified.

Parameters#

trial_info : RunInfo Object that contains enough information to execute a configuration run in isolation. dask_data_to_scatter: dict[str, Any] When a user scatters data from their local process to the distributed network, this data is distributed in a round-robin fashion grouping by number of cores. Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data every time we would like to execute a target function with a big dataset. For example, when your target function has a big dataset shared across all the target function, this argument is very useful.

Returns#

info : TrialInfo An object containing the configuration launched. value : TrialValue Contains information about the status/performance of config.

Source code in smac/runner/abstract_runner.py
def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.

    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.

    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start

        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }

    end = time.time()

    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )

        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")

        status = StatusType.CRASHED

    if status == StatusType.CRASHED:
        cost = self._crash_cost

    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )

    return trial_info, trial_value

submit_trial #

submit_trial(trial_info: TrialInfo) -> None

This function submits a trial_info object in a serial fashion. As there is a single worker for this task, this interface can be considered a wrapper over the run method.

Both result/exceptions can be completely determined in this step so both lists are properly filled.

Parameters#

trial_info : TrialInfo An object containing the configuration launched.

Source code in smac/runner/abstract_serial_runner.py
def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a trial_info object in a serial fashion. As there is a single
     worker for this task, this interface can be considered a wrapper over the `run` method.

    Both result/exceptions can be completely determined in this step so both lists
    are properly filled.

    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    self._results_queue.append(self.run_wrapper(trial_info))

wait #

wait() -> None

The SMBO/intensifier might need to wait for trials to finish before making a decision. For serial runners, no wait is needed as the result is immediately available.

Source code in smac/runner/abstract_serial_runner.py
def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision.
    For serial runners, no wait is needed as the result is immediately available.
    """
    # There is no need to wait in serial runners. When launching a trial via submit, as
    # the serial trial uses the same process to run, the result is always available
    # immediately after. This method implements is just an implementation of the
    # abstract method via a simple return, again, because there is no need to wait
    return