Bases: AbstractSerialRunner
        Class to execute target functions from scripts. Uses Popen to execute the script in a
 subprocess.
The following example shows how the script is called:
target_function --instance=test --instance_features=test --seed=0 --hyperparameter1=5323
The script must return an echo in the following form (white-spaces are removed):
cost=0.5; runtime=0.01; status=SUCCESS; additional_info=test (single-objective)
cost=0.5, 0.4; runtime=0.01; status=SUCCESS; additional_info=test (multi-objective)
The status must be a string and must be one of the StatusType values. However, runtime,
status and additional_info are optional.
Note
Everytime an instance is passed, also an instance feature in form of a comma-separated list
(no spaces) of floats is passed. If no instance feature for the instance is given,
an empty list is passed.
Parameters
target_function : Callable
    The target function.
scenario : Scenario
required_arguments : list[str]
    A list of required arguments, which are passed to the target function.
                  
                    Source code in smac/runner/target_function_script_runner.py
                    |  | def __init__(
    self,
    target_function: str,
    scenario: Scenario,
    required_arguments: list[str] = None,
):
    if required_arguments is None:
        required_arguments = []
    super().__init__(scenario=scenario, required_arguments=required_arguments)
    self._target_function = target_function
    # Check if target function is callable
    if not isinstance(self._target_function, str):
        raise TypeError(
            "Argument `target_function` must be a string but is type" f"`{type(self._target_function)}`."
        )
    if self._scenario.trial_memory_limit is not None:
        logger.warning("Trial memory limit is not supported for script target functions.")
    if self._scenario.trial_walltime_limit is not None:
        logger.warning("Trial walltime limit is not supported for script target functions.")
 | 
 
  
    
        Calls the algorithm, which is processed in the run method.
            
              Source code in smac/runner/target_function_script_runner.py
              |  | def __call__(
    self,
    algorithm_kwargs: dict[str, Any],
) -> tuple[str, str]:
    """Calls the algorithm, which is processed in the ``run`` method."""
    cmd = [self._target_function]
    for k, v in algorithm_kwargs.items():
        v = str(v)
        k = str(k)
        # Let's remove some spaces
        v = v.replace(" ", "")
        cmd += [f"--{k}={v}"]
    logger.debug(f"Calling: {' '.join(cmd)}")
    p = Popen(cmd, shell=False, stdout=PIPE, stderr=PIPE, universal_newlines=True)
    output, error = p.communicate()
    logger.debug("Stdout: %s" % output)
    logger.debug("Stderr: %s" % error)
    return output, error
 | 
 
     
 
count_available_workers() -> int
        Returns the number of available workers. Serial workers only have one worker.
            
              Source code in smac/runner/abstract_serial_runner.py
              |  | def count_available_workers(self) -> int:
    """Returns the number of available workers. Serial workers only have one worker."""
    return 1
 | 
 
     
 
    
        Calls the target function.
Parameters
config : Configuration
    Configuration to be passed to the target function.
instance : str | None, defaults to None
    The Problem instance.
budget : float | None, defaults to None
    A positive, real-valued number representing an arbitrary limit to the target function
    handled by the target function internally.
seed : int, defaults to None
Returns
status : StatusType
    Status of the trial.
cost : float | list[float]
    Resulting cost(s) of the trial.
runtime : float
    The time the target function took to run.
cpu_time : float
    The time the target function took on the hardware to run.
additional_info : dict
    All further additional trial information.
            
              Source code in smac/runner/target_function_script_runner.py
              |  | def run(
    self,
    config: Configuration,
    instance: str | None = None,
    budget: float | None = None,
    seed: int | None = None,
) -> tuple[StatusType, float | list[float], float, float, dict]:
    """Calls the target function.
    Parameters
    ----------
    config : Configuration
        Configuration to be passed to the target function.
    instance : str | None, defaults to None
        The Problem instance.
    budget : float | None, defaults to None
        A positive, real-valued number representing an arbitrary limit to the target function
        handled by the target function internally.
    seed : int, defaults to None
    Returns
    -------
    status : StatusType
        Status of the trial.
    cost : float | list[float]
        Resulting cost(s) of the trial.
    runtime : float
        The time the target function took to run.
    cpu_time : float
        The time the target function took on the hardware to run.
    additional_info : dict
        All further additional trial information.
    """
    # The kwargs are passed to the target function.
    kwargs: dict[str, Any] = {}
    if "seed" in self._required_arguments:
        kwargs["seed"] = seed
    if "instance" in self._required_arguments:
        kwargs["instance"] = instance
        # In contrast to the normal target function runner, we also add the instance features here.
        if self._scenario.instance_features is not None and instance in self._scenario.instance_features:
            kwargs["instance_features"] = self._scenario.instance_features[instance]
        else:
            kwargs["instance_features"] = []
    if "budget" in self._required_arguments:
        kwargs["budget"] = budget
    # Presetting
    cost: float | list[float] = self._crash_cost
    runtime = 0.0
    cpu_time = runtime
    additional_info = {}
    status = StatusType.SUCCESS
    # Add config arguments to the kwargs
    for k, v in dict(config).items():
        if k in kwargs:
            raise RuntimeError(f"The key {k} is already in use. Please use a different one.")
        kwargs[k] = v
    # Call target function
    start_time = time.time()
    cpu_time = time.process_time()
    output, error = self(kwargs)
    cpu_time = time.process_time() - cpu_time
    runtime = time.time() - start_time
    # Now we have to parse the std output
    # First remove white-spaces
    output = output.replace(" ", "")
    outputs = {}
    for pair in output.split(";"):
        try:
            kv = pair.split("=")
            k, v = kv[0], kv[1]
            # Get rid of the trailing newline
            v = v.strip()
            outputs[k] = v
        except Exception:
            pass
    # Parse status
    if "status" in outputs:
        status = getattr(StatusType, outputs["status"])
    # Parse costs (depends on the number of objectives)
    if "cost" in outputs:
        if self._n_objectives == 1:
            cost = float(outputs["cost"])
        else:
            costs = outputs["cost"].split(",")
            costs = [float(c) for c in costs]
            cost = costs
            if len(costs) != self._n_objectives:
                raise RuntimeError("The number of costs does not match the number of objectives.")
    else:
        status = StatusType.CRASHED
    # Overwrite runtime
    if "runtime" in outputs:
        runtime = float(outputs["runtime"])
    # Overwrite CPU time
    if "cpu_time" in outputs:
        cpu_time = float(outputs["cpu_time"])
    # Add additional info
    if "additional_info" in outputs:
        additional_info["additional_info"] = outputs["additional_info"]
    if status != StatusType.SUCCESS:
        additional_info["error"] = error
        if cost != self._crash_cost:
            cost = self._crash_cost
            logger.info(
                "The target function crashed but returned a cost. The cost is ignored and replaced by crash cost."
            )
    return status, cost, runtime, cpu_time, additional_info
 | 
 
     
 
    
        Wrapper around run() to execute and check the execution of a given config.
This function encapsulates common
handling/processing, so that run() implementation is simplified.
Parameters
trial_info : RunInfo
    Object that contains enough information to execute a configuration run in isolation.
dask_data_to_scatter: dict[str, Any]
    When a user scatters data from their local process to the distributed network,
    this data is distributed in a round-robin fashion grouping by number of cores.
    Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
    every time we would like to execute a target function with a big dataset.
    For example, when your target function has a big dataset shared across all the target function,
    this argument is very useful.
Returns
info : TrialInfo
    An object containing the configuration launched.
value : TrialValue
    Contains information about the status/performance of config.
            
              Source code in smac/runner/abstract_runner.py
              |  | def run_wrapper(
    self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, Any]
) -> tuple[TrialInfo, TrialValue]:
    """Wrapper around run() to execute and check the execution of a given config.
    This function encapsulates common
    handling/processing, so that run() implementation is simplified.
    Parameters
    ----------
    trial_info : RunInfo
        Object that contains enough information to execute a configuration run in isolation.
    dask_data_to_scatter: dict[str, Any]
        When a user scatters data from their local process to the distributed network,
        this data is distributed in a round-robin fashion grouping by number of cores.
        Roughly speaking, we can keep this data in memory and then we do not have to (de-)serialize the data
        every time we would like to execute a target function with a big dataset.
        For example, when your target function has a big dataset shared across all the target function,
        this argument is very useful.
    Returns
    -------
    info : TrialInfo
        An object containing the configuration launched.
    value : TrialValue
        Contains information about the status/performance of config.
    """
    start = time.time()
    cpu_time = time.process_time()
    try:
        status, cost, runtime, cpu_time, additional_info = self.run(
            config=trial_info.config,
            instance=trial_info.instance,
            budget=trial_info.budget,
            seed=trial_info.seed,
            **dask_data_to_scatter,
        )
    except Exception as e:
        status = StatusType.CRASHED
        cost = self._crash_cost
        cpu_time = time.process_time() - cpu_time
        runtime = time.time() - start
        # Add context information to the error message
        exception_traceback = traceback.format_exc()
        error_message = repr(e)
        additional_info = {
            "traceback": exception_traceback,
            "error": error_message,
        }
    end = time.time()
    # Catch NaN or inf
    if not np.all(np.isfinite(cost)):
        logger.warning(
            "Target function returned infinity or nothing at all. Result is treated as CRASHED"
            f" and cost is set to {self._crash_cost}."
        )
        if "traceback" in additional_info:
            logger.warning(f"Traceback: {additional_info['traceback']}\n")
        status = StatusType.CRASHED
    if status == StatusType.CRASHED:
        cost = self._crash_cost
    trial_value = TrialValue(
        status=status,
        cost=cost,
        time=runtime,
        cpu_time=cpu_time,
        additional_info=additional_info,
        starttime=start,
        endtime=end,
    )
    return trial_info, trial_value
 | 
 
     
 
    
        This function submits a trial_info object in a serial fashion. As there is a single
 worker for this task, this interface can be considered a wrapper over the run method.
Both result/exceptions can be completely determined in this step so both lists
are properly filled.
Parameters
trial_info : TrialInfo
    An object containing the configuration launched.
            
              Source code in smac/runner/abstract_serial_runner.py
              |  | def submit_trial(self, trial_info: TrialInfo) -> None:
    """This function submits a trial_info object in a serial fashion. As there is a single
     worker for this task, this interface can be considered a wrapper over the `run` method.
    Both result/exceptions can be completely determined in this step so both lists
    are properly filled.
    Parameters
    ----------
    trial_info : TrialInfo
        An object containing the configuration launched.
    """
    self._results_queue.append(self.run_wrapper(trial_info))
 | 
 
     
 
    
        The SMBO/intensifier might need to wait for trials to finish before making a decision.
For serial runners, no wait is needed as the result is immediately available.
            
              Source code in smac/runner/abstract_serial_runner.py
              |  | def wait(self) -> None:
    """The SMBO/intensifier might need to wait for trials to finish before making a decision.
    For serial runners, no wait is needed as the result is immediately available.
    """
    # There is no need to wait in serial runners. When launching a trial via submit, as
    # the serial trial uses the same process to run, the result is always available
    # immediately after. This method implements is just an implementation of the
    # abstract method via a simple return, again, because there is no need to wait
    return
 |