Skip to content

Api

neps.api #

API for the neps package.

run #

run(
    run_pipeline: Callable | None = None,
    root_directory: str | Path | None = None,
    pipeline_space: (
        dict[str, Parameter | ConfigurationSpace]
        | str
        | Path
        | ConfigurationSpace
        | None
    ) = None,
    run_args: str | Path | None = None,
    overwrite_working_directory: bool = False,
    post_run_summary: bool = False,
    development_stage_id=None,
    task_id=None,
    max_evaluations_total: int | None = None,
    max_evaluations_per_run: int | None = None,
    continue_until_max_evaluation_completed: bool = False,
    max_cost_total: int | float | None = None,
    ignore_errors: bool = False,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    pre_load_hooks: Iterable | None = None,
    searcher: (
        Literal[
            "default",
            "bayesian_optimization",
            "random_search",
            "hyperband",
            "priorband",
            "mobster",
            "asha",
            "regularized_evolution",
        ]
        | BaseOptimizer
    ) = "default",
    searcher_path: Path | str | None = None,
    **searcher_kwargs
) -> None

Run a neural pipeline search.

To parallelize

To run a neural pipeline search with multiple processes or machines, simply call run(.) multiple times (optionally on different machines). Make sure that root_directory points to the same folder on the same filesystem, otherwise, the multiple calls to run(.) will be independent.

PARAMETER DESCRIPTION
run_pipeline

The objective function to minimize.

TYPE: Callable | None DEFAULT: None

pipeline_space

The search space to minimize over.

TYPE: dict[str, Parameter | ConfigurationSpace] | str | Path | ConfigurationSpace | None DEFAULT: None

root_directory

The directory to save progress to. This is also used to synchronize multiple calls to run(.) for parallelization.

TYPE: str | Path | None DEFAULT: None

run_args

An option for providing the optimization settings e.g. max_evaluation_total in a YAML file.

TYPE: str | Path | None DEFAULT: None

overwrite_working_directory

If true, delete the working directory at the start of the run. This is, e.g., useful when debugging a run_pipeline function.

TYPE: bool DEFAULT: False

post_run_summary

If True, creates a csv file after each worker is done, holding summary information about the configs and results.

TYPE: bool DEFAULT: False

development_stage_id

ID for the current development stage. Only needed if you work with multiple development stages.

DEFAULT: None

task_id

ID for the current task. Only needed if you work with multiple tasks.

DEFAULT: None

max_evaluations_total

Number of evaluations after which to terminate.

TYPE: int | None DEFAULT: None

max_evaluations_per_run

Number of evaluations the specific call to run(.) should maximally do.

TYPE: int | None DEFAULT: None

continue_until_max_evaluation_completed

If true, only stop after max_evaluations_total have been completed. This is only relevant in the parallel setting.

TYPE: bool DEFAULT: False

max_cost_total

No new evaluations will start when this cost is exceeded. Requires returning a cost in the run_pipeline function, e.g., return dict(loss=loss, cost=cost).

TYPE: int | float | None DEFAULT: None

ignore_errors

Ignore hyperparameter settings that threw an error and do not raise an error. Error configs still count towards max_evaluations_total.

TYPE: bool DEFAULT: False

loss_value_on_error

Setting this and cost_value_on_error to any float will supress any error and will use given loss value instead. default: None

TYPE: None | float DEFAULT: None

cost_value_on_error

Setting this and loss_value_on_error to any float will supress any error and will use given cost value instead. default: None

TYPE: None | float DEFAULT: None

pre_load_hooks

List of functions that will be called before load_results().

TYPE: Iterable | None DEFAULT: None

searcher

Which optimizer to use. This is usually only needed by neps developers.

TYPE: Literal['default', 'bayesian_optimization', 'random_search', 'hyperband', 'priorband', 'mobster', 'asha', 'regularized_evolution'] | BaseOptimizer DEFAULT: 'default'

searcher_path

The path to the user created searcher. None when the user is using NePS designed searchers.

TYPE: Path | str | None DEFAULT: None

**searcher_kwargs

Will be passed to the searcher. This is usually only needed by neps develolpers.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

If deprecated argument working_directory is used.

ValueError

If root_directory is None.

Example

import neps

def run_pipeline(some_parameter: float): validation_error = -some_parameter return validation_error

pipeline_space = dict(some_parameter=neps.FloatParameter(lower=0, upper=1))

logging.basicConfig(level=logging.INFO) neps.run( run_pipeline=run_pipeline, pipeline_space=pipeline_space, root_directory="usage_example", max_evaluations_total=5, )

Source code in neps/api.py
def run(
    run_pipeline: Callable | None = None,
    root_directory: str | Path | None = None,
    pipeline_space: (
        dict[str, Parameter | CS.ConfigurationSpace]
        | str
        | Path
        | CS.ConfigurationSpace
        | None
    ) = None,
    run_args: str | Path | None = None,
    overwrite_working_directory: bool = False,
    post_run_summary: bool = False,
    development_stage_id=None,
    task_id=None,
    max_evaluations_total: int | None = None,
    max_evaluations_per_run: int | None = None,
    continue_until_max_evaluation_completed: bool = False,
    max_cost_total: int | float | None = None,
    ignore_errors: bool = False,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    pre_load_hooks: Iterable | None = None,
    searcher: (
        Literal[
            "default",
            "bayesian_optimization",
            "random_search",
            "hyperband",
            "priorband",
            "mobster",
            "asha",
            "regularized_evolution",
        ]
        | BaseOptimizer
    ) = "default",
    searcher_path: Path | str | None = None,
    **searcher_kwargs,
) -> None:
    """Run a neural pipeline search.

    To parallelize:
        To run a neural pipeline search with multiple processes or machines,
        simply call run(.) multiple times (optionally on different machines). Make sure
        that root_directory points to the same folder on the same filesystem, otherwise,
        the multiple calls to run(.) will be independent.

    Args:
        run_pipeline: The objective function to minimize.
        pipeline_space: The search space to minimize over.
        root_directory: The directory to save progress to. This is also used to
            synchronize multiple calls to run(.) for parallelization.
        run_args: An option for providing the optimization settings e.g.
            max_evaluation_total in a YAML file.
        overwrite_working_directory: If true, delete the working directory at the start of
            the run. This is, e.g., useful when debugging a run_pipeline function.
        post_run_summary: If True, creates a csv file after each worker is done,
            holding summary information about the configs and results.
        development_stage_id: ID for the current development stage. Only needed if
            you work with multiple development stages.
        task_id: ID for the current task. Only needed if you work with multiple
            tasks.
        max_evaluations_total: Number of evaluations after which to terminate.
        max_evaluations_per_run: Number of evaluations the specific call to run(.) should
            maximally do.
        continue_until_max_evaluation_completed: If true, only stop after
            max_evaluations_total have been completed. This is only relevant in the
            parallel setting.
        max_cost_total: No new evaluations will start when this cost is exceeded. Requires
            returning a cost in the run_pipeline function, e.g.,
            `return dict(loss=loss, cost=cost)`.
        ignore_errors: Ignore hyperparameter settings that threw an error and do not raise
            an error. Error configs still count towards max_evaluations_total.
        loss_value_on_error: Setting this and cost_value_on_error to any float will
            supress any error and will use given loss value instead. default: None
        cost_value_on_error: Setting this and loss_value_on_error to any float will
            supress any error and will use given cost value instead. default: None
        pre_load_hooks: List of functions that will be called before load_results().
        searcher: Which optimizer to use. This is usually only needed by neps developers.
        searcher_path: The path to the user created searcher. None when the user
            is using NePS designed searchers.
        **searcher_kwargs: Will be passed to the searcher. This is usually only needed by
            neps develolpers.

    Raises:
        ValueError: If deprecated argument working_directory is used.
        ValueError: If root_directory is None.


    Example:
        >>> import neps

        >>> def run_pipeline(some_parameter: float):
        >>>    validation_error = -some_parameter
        >>>    return validation_error

        >>> pipeline_space = dict(some_parameter=neps.FloatParameter(lower=0, upper=1))

        >>> logging.basicConfig(level=logging.INFO)
        >>> neps.run(
        >>>    run_pipeline=run_pipeline,
        >>>    pipeline_space=pipeline_space,
        >>>    root_directory="usage_example",
        >>>    max_evaluations_total=5,
        >>> )
    """
    if "working_directory" in searcher_kwargs:
        raise ValueError(
            "The argument 'working_directory' is deprecated, please use 'root_directory' "
            "instead"
        )

    if "budget" in searcher_kwargs:
        warnings.warn(
            "The argument: 'budget' is deprecated. In the neps.run call, please, use "
            "'max_cost_total' instead. In future versions using `budget` will fail.",
            DeprecationWarning,
            stacklevel=2,
        )
        max_cost_total = searcher_kwargs["budget"]
        del searcher_kwargs["budget"]
    logger = logging.getLogger("neps")

    # if arguments via run_args provided overwrite them
    if run_args:
        # Check if the user provided other arguments directly to neps.run().
        # If so, raise an error.
        check_arg_defaults(run, locals())

        # Warning if the user has specified default values for arguments that differ
        # from those specified in 'run_args'. These user-defined changes are not applied.
        warnings.warn(
            "WARNING: Loading arguments from 'run_args'. Arguments directly provided "
            "to neps.run(...) will be not used!"
        )

        optim_settings = get_run_args_from_yaml(run_args)

        # Update each argument based on optim_settings. If not key is not provided in yaml
        # use default value. Currently strict but will change in the future.
        run_pipeline = optim_settings.get("run_pipeline", None)
        root_directory = optim_settings.get("root_directory", None)
        pipeline_space = optim_settings.get("pipeline_space", None)
        overwrite_working_directory = optim_settings.get(
            "overwrite_working_directory", False
        )
        post_run_summary = optim_settings.get("post_run_summary", False)
        development_stage_id = optim_settings.get("development_stage_id", None)
        task_id = optim_settings.get("task_id", None)
        max_evaluations_total = optim_settings.get("max_evaluations_total", None)
        max_evaluations_per_run = optim_settings.get("max_evaluations_per_run", None)
        continue_until_max_evaluation_completed = optim_settings.get(
            "continue_until_max_evaluation_completed",
            False,
        )
        max_cost_total = optim_settings.get("max_cost_total", None)
        ignore_errors = optim_settings.get("ignore_errors", False)
        loss_value_on_error = optim_settings.get("loss_value_on_error", None)
        cost_value_on_error = optim_settings.get("cost_value_on_error", None)
        pre_load_hooks = optim_settings.get("pre_load_hooks", None)
        searcher = optim_settings.get("searcher", "default")
        searcher_path = optim_settings.get("searcher_path", None)
        for key, value in optim_settings.get("searcher_kwargs", {}).items():
            searcher_kwargs[key] = value

    # check if necessary arguments are provided.
    check_essential_arguments(
        run_pipeline,
        root_directory,
        pipeline_space,
        max_cost_total,
        max_evaluations_total,
        searcher,
        run_args,
    )

    if pre_load_hooks is None:
        pre_load_hooks = []

    logger.info(f"Starting neps.run using root directory {root_directory}")

    # Used to create the yaml holding information about the searcher.
    # Also important for testing and debugging the api.
    searcher_info = {
        "searcher_name": "",
        "searcher_alg": "",
        "searcher_selection": "",
        "neps_decision_tree": True,
        "searcher_args": {},
    }

    # special case if you load your own optimizer via run_args
    if inspect.isclass(searcher):
        if issubclass(searcher, BaseOptimizer):
            search_space = pipeline_space_from_yaml(pipeline_space)
            search_space = SearchSpace(**search_space)
            searcher = searcher(search_space)
        else:
            # Raise an error if searcher is not a subclass of BaseOptimizer
            raise TypeError(
                "The provided searcher must be a class that inherits from BaseOptimizer."
            )

    if isinstance(searcher, BaseOptimizer):
        searcher_instance = searcher
        searcher_info["searcher_name"] = "baseoptimizer"
        searcher_info["searcher_alg"] = searcher.whoami()
        searcher_info["searcher_selection"] = "user-instantiation"
        searcher_info["neps_decision_tree"] = False
    else:
        (
            searcher_instance,
            searcher_info,
        ) = _run_args(
            searcher_info=searcher_info,
            pipeline_space=pipeline_space,
            max_cost_total=max_cost_total,
            ignore_errors=ignore_errors,
            loss_value_on_error=loss_value_on_error,
            cost_value_on_error=cost_value_on_error,
            logger=logger,
            searcher=searcher,
            searcher_path=searcher_path,
            **searcher_kwargs,
        )

    # Check to verify if the target directory contains history of another optimizer state
    # This check is performed only when the `searcher` is built during the run
    if not isinstance(searcher, (BaseOptimizer, str)):
        raise ValueError(
            f"Unrecognized `searcher` of type {type(searcher)}. Not str or BaseOptimizer."
        )
    elif isinstance(searcher, BaseOptimizer):
        # This check is not strict when a user-defined neps.optimizer is provided
        logger.warning(
            "An instantiated optimizer is provided. The safety checks of NePS will be "
            "skipped. Accurate continuation of runs can no longer be guaranteed!"
        )

    if task_id is not None:
        root_directory = Path(root_directory) / f"task_{task_id}"
    if development_stage_id is not None:
        root_directory = Path(root_directory) / f"dev_{development_stage_id}"

    launch_runtime(
        evaluation_fn=run_pipeline,
        sampler=searcher_instance,
        optimizer_info=searcher_info,
        optimization_dir=root_directory,
        max_evaluations_total=max_evaluations_total,
        max_evaluations_per_run=max_evaluations_per_run,
        continue_until_max_evaluation_completed=continue_until_max_evaluation_completed,
        logger=logger,
        post_evaluation_hook=_post_evaluation_hook_function(
            loss_value_on_error, ignore_errors
        ),
        overwrite_optimization_dir=overwrite_working_directory,
        pre_load_hooks=pre_load_hooks,
    )

    if post_run_summary:
        assert root_directory is not None
        post_run_csv(root_directory)