Skip to content

Runtime

neps.runtime #

Module for the runtime of a single instance of NePS running.

An important advantage of NePS with a running instance per worker and no multiprocessing is that we can reliably use globals to store information such as the currently running configuration, without interfering with other workers which have launched.

This allows us to have a global Trial object which can be accessed using import neps.runtime; neps.get_in_progress_trial().


This module primarily handles the worker loop where important concepts are: * State: The state of optimization is all of the configurations, their results and the current state of the optimizer. * Shared State: Whenever a worker wishes to read or write any state, they will lock the shared state, declaring themselves as operating on it. At this point, no other worker can access the shared state. * Optimizer Hydration: This is the process through which an optimizer instance is hydrated with the Shared State so it can make a decision, i.e. for sampling. Equally we serialize the optimizer when writing it back to Shared State * Trial Lock: When evaluating a configuration, a worker must lock it to declared itself as evaluating it. This communicates to other workers that this configuration is in progress.

Loop#

We mark lines with + as the worker having locked the Shared State and ~ as the worker having locked the Trial. The trial lock ~ is allowed to fail, in which case all steps with a ~ are skipped and the loop continues.

    • Check exit conditions
    • Hydrate the optimizer
    • Sample a new Trial
  1. Unlock the Shared State
  2. ~ Obtain a Trial Lock
  3. ~ Set the global trial for this work to the current trial
  4. ~ Evaluate the trial
  5. ~+ Lock the shared state
  6. ~+ Write the results of the config to disk
  7. ~+ Update the optimizer if required (used budget for evaluating trial)
  8. ~ Unlock the shared state
  9. Unlock Trial Lock

SharedState dataclass #

SharedState(base_dir: Path, create_dirs: bool = False)

The shared state of the optimization process that workers communicate through.

ATTRIBUTE DESCRIPTION
base_dir

The base directory from which the optimization is running.

TYPE: Path

create_dirs

Whether to create the directories if they do not exist.

TYPE: bool

lock

The lock to signify that a worker is operating on the shared state.

TYPE: Locker

optimizer_state_file

The path to the optimizers state.

TYPE: Path

optimizer_info_file

The path to the file containing information about the optimizer's setup.

TYPE: Path

results_dir

Directory where results for configurations are stored.

TYPE: Path

check_optimizer_info_on_disk_matches #

check_optimizer_info_on_disk_matches(
    optimizer_info: dict[str, Any],
    *,
    excluded_keys: Iterable[str] = ("searcher_name")
) -> None

Sanity check that the provided info matches the one on disk (if any).

PARAMETER DESCRIPTION
optimizer_info

The optimizer info to check.

TYPE: dict[str, Any]

excluded_keys

Any keys to exclude during the comparison.

TYPE: Iterable[str] DEFAULT: ('searcher_name')

RAISES DESCRIPTION
ValueError

If there is optimizer info on disk and it does not match the

Source code in neps/runtime.py
def check_optimizer_info_on_disk_matches(
    self,
    optimizer_info: dict[str, Any],
    *,
    excluded_keys: Iterable[str] = ("searcher_name",),
) -> None:
    """Sanity check that the provided info matches the one on disk (if any).

    Args:
        optimizer_info: The optimizer info to check.
        excluded_keys: Any keys to exclude during the comparison.

    Raises:
        ValueError: If there is optimizer info on disk and it does not match the
        provided info.
    """
    optimizer_info_copy = optimizer_info.copy()
    loaded_info = deserialize(self.optimizer_info_file)

    for key in excluded_keys:
        optimizer_info_copy.pop(key, None)
        loaded_info.pop(key, None)

    if optimizer_info_copy != loaded_info:
        raise ValueError(
            f"The sampler_info in the file {self.optimizer_info_file} is not valid. "
            f"Expected: {optimizer_info_copy}, Found: {loaded_info}",
        )

trial_refs #

trial_refs() -> dict[State, list[Disk]]

Get the disk reference of every trial, grouped by their state.

Source code in neps/runtime.py
def trial_refs(self) -> dict[Trial.State, list[Trial.Disk]]:
    """Get the disk reference of every trial, grouped by their state."""
    refs = [
        Trial.Disk.from_dir(pipeline_dir=pipeline_dir)
        for pipeline_dir in self.results_dir.iterdir()
        if pipeline_dir.is_dir()
    ]
    by_state: dict[Trial.State, list[Trial.Disk]] = defaultdict(list)
    for ref in refs:
        by_state[ref.state].append(ref)

    return by_state

Trial dataclass #

Trial(
    id: str,
    config: ConfigLike,
    pipeline_dir: Path,
    prev_config_id: str | None,
    metadata: dict[str, Any],
    results: dict[str, Any] | ERROR | None = None,
)

A trial is a configuration and it's associated data.

The object is considered mutable and the global trial currently being evaluated can be access using get_in_progress_trial().

ATTRIBUTE DESCRIPTION
id

Unique identifier for the configuration

TYPE: str

config

The configuration to evaluate

TYPE: ConfigLike

pipeline_dir

Directory where the configuration is evaluated

TYPE: Path

prev_config_id

The id of the previous configuration evaluated for this trial.

TYPE: str | None

metadata

Additional metadata about the configuration

TYPE: dict[str, Any]

results

The results of the evaluation, if any

TYPE: dict[str, Any] | ERROR | None

disk

The disk information of this trial such as paths and locks

TYPE: Disk

state property #

state: State

The state of the trial on disk.

Disk dataclass #

Disk(pipeline_dir: Path)

The disk information of a trial.

ATTRIBUTE DESCRIPTION
pipeline_dir

The directory where the trial is stored

TYPE: Path

id

The unique identifier of the trial

TYPE: str

config_file

The path to the configuration file

TYPE: Path

result_file

The path to the result file

TYPE: Path

metadata_file

The path to the metadata file

TYPE: Path

optimization_dir

The directory from which optimization is running

TYPE: Path

previous_config_id_file

The path to the previous config id file

TYPE: Path

previous_pipeline_dir

The directory of the previous configuration

TYPE: Path | None

lock

The lock for the trial. Obtaining this lock indicates the worker is evaluating this trial.

TYPE: Locker

state property #
state: State

The state of the trial.

config #
config() -> ConfigLike

Deserialize the configuration from disk.

Source code in neps/runtime.py
def config(self) -> ConfigLike:
    """Deserialize the configuration from disk."""
    return deserialize(self.config_file)
from_dir classmethod #
from_dir(pipeline_dir: Path) -> Disk

Create a Trial.Disk object from a directory.

Source code in neps/runtime.py
@classmethod
def from_dir(cls, pipeline_dir: Path) -> Trial.Disk:
    """Create a `Trial.Disk` object from a directory."""
    return cls(pipeline_dir=pipeline_dir)
load #
load() -> Trial

Load the trial from disk.

Source code in neps/runtime.py
def load(self) -> Trial:
    """Load the trial from disk."""
    config = deserialize(self.config_file)
    if not empty_file(self.metadata_file):
        metadata = deserialize(self.metadata_file)
    else:
        metadata = {}

    if not empty_file(self.result_file):
        result = deserialize(self.result_file)
    else:
        result = None

    if not empty_file(self.previous_config_id_file):
        previous_config_id = self.previous_config_id_file.read_text().strip()
    else:
        previous_config_id = None

    return Trial(
        id=self.id,
        config=config,
        pipeline_dir=self.pipeline_dir,
        metadata=metadata,
        prev_config_id=previous_config_id,
        results=result,
    )
to_result #
to_result(
    config_transform: (
        Callable[[ConfigLike], ConfigLike] | None
    ) = None
) -> ConfigResult

Convert the trial to a ConfigResult object.

PARAMETER DESCRIPTION
config_transform

A function to transform the configuration before creating the ConfigResult.

TYPE: Callable[[ConfigLike], ConfigLike] | None DEFAULT: None

RETURNS DESCRIPTION
ConfigResult

A ConfigResult object usable by optimizers.

Source code in neps/runtime.py
def to_result(
    self,
    config_transform: Callable[[ConfigLike], ConfigLike] | None = None,
) -> ConfigResult:
    """Convert the trial to a `ConfigResult` object.

    Args:
        config_transform: A function to transform the configuration before
            creating the `ConfigResult`.

    Returns:
        A `ConfigResult` object usable by optimizers.
    """
    config = deserialize(self.config_file)
    result = deserialize(self.result_file)
    metadata = deserialize(self.metadata_file)
    _config = config_transform(config) if config_transform is not None else config

    return ConfigResult(
        id=self.id,
        config=_config,
        result=result,
        metadata=metadata,
    )

State #

Bases: str, Enum

The state of a trial.

COMPLETE class-attribute instance-attribute #
COMPLETE = 'evaluated'

The trial has been evaluated and results are available.

CORRUPTED class-attribute instance-attribute #
CORRUPTED = 'corrupted'

The trial is not in one of the previous states and should be removed.

IN_PROGRESS class-attribute instance-attribute #
IN_PROGRESS = 'in_progress'

There is currently a worker evaluating this trial.

PENDING class-attribute instance-attribute #
PENDING = 'pending'

The trial has been sampled but no worker has been assigned to evaluate it.

write_to_disk #

write_to_disk() -> Disk

Serliaze the trial to disk.

Source code in neps/runtime.py
def write_to_disk(self) -> Trial.Disk:
    """Serliaze the trial to disk."""
    serialize(self.config, self.disk.config_file)
    serialize(self.metadata, self.disk.metadata_file)

    if self.prev_config_id is not None:
        self.disk.previous_config_id_file.write_text(self.prev_config_id)

    if self.results is not None:
        serialize(self.results, self.disk.result_file)

    return self.disk

get_in_progress_trial #

get_in_progress_trial() -> Trial | None

Get the currently running trial in this process.

Source code in neps/runtime.py
def get_in_progress_trial() -> Trial | None:
    """Get the currently running trial in this process."""
    return _CURRENTLY_RUNNING_TRIAL_IN_PROCESS

launch_runtime #

launch_runtime(
    *,
    evaluation_fn: Callable[..., float | Mapping[str, Any]],
    sampler: BaseOptimizer,
    optimizer_info: dict,
    optimization_dir: Path | str,
    max_evaluations_total: int | None = None,
    max_evaluations_per_run: int | None = None,
    continue_until_max_evaluation_completed: bool = False,
    logger: Logger | None = None,
    post_evaluation_hook: (
        POST_EVAL_HOOK_SIGNATURE | None
    ) = None,
    overwrite_optimization_dir: bool = False,
    pre_load_hooks: (
        Iterable[Callable[[BaseOptimizer], BaseOptimizer]]
        | None
    ) = None
) -> None

Launch the runtime of a single instance of NePS.

Please refer to the module docstring for a detailed explanation of the runtime. Runs until some exit condition is met.

PARAMETER DESCRIPTION
evaluation_fn

The evaluation function to use.

TYPE: Callable[..., float | Mapping[str, Any]]

sampler

The optimizer to use for sampling configurations.

TYPE: BaseOptimizer

optimizer_info

Information about the optimizer.

TYPE: dict

optimization_dir

The directory where the optimization is running.

TYPE: Path | str

max_evaluations_total

The maximum number of evaluations to run.

TYPE: int | None DEFAULT: None

max_evaluations_per_run

The maximum number of evaluations to run in a single run.

TYPE: int | None DEFAULT: None

continue_until_max_evaluation_completed

Whether to continue until the maximum evaluations are completed.

TYPE: bool DEFAULT: False

logger

The logger to use.

TYPE: Logger | None DEFAULT: None

post_evaluation_hook

A hook to run after the evaluation.

TYPE: POST_EVAL_HOOK_SIGNATURE | None DEFAULT: None

overwrite_optimization_dir

Whether to overwrite the optimization directory.

TYPE: bool DEFAULT: False

pre_load_hooks

Hooks to run before loading the results.

TYPE: Iterable[Callable[[BaseOptimizer], BaseOptimizer]] | None DEFAULT: None

Source code in neps/runtime.py
def launch_runtime(  # noqa: PLR0913, C901, PLR0915
    *,
    evaluation_fn: Callable[..., float | Mapping[str, Any]],
    sampler: BaseOptimizer,
    optimizer_info: dict,
    optimization_dir: Path | str,
    max_evaluations_total: int | None = None,
    max_evaluations_per_run: int | None = None,
    continue_until_max_evaluation_completed: bool = False,
    logger: logging.Logger | None = None,
    post_evaluation_hook: POST_EVAL_HOOK_SIGNATURE | None = None,
    overwrite_optimization_dir: bool = False,
    pre_load_hooks: Iterable[Callable[[BaseOptimizer], BaseOptimizer]] | None = None,
) -> None:
    """Launch the runtime of a single instance of NePS.

    Please refer to the module docstring for a detailed explanation of the runtime.
    Runs until some exit condition is met.

    Args:
        evaluation_fn: The evaluation function to use.
        sampler: The optimizer to use for sampling configurations.
        optimizer_info: Information about the optimizer.
        optimization_dir: The directory where the optimization is running.
        max_evaluations_total: The maximum number of evaluations to run.
        max_evaluations_per_run: The maximum number of evaluations to run in a single run.
        continue_until_max_evaluation_completed: Whether to continue until the maximum
            evaluations are completed.
        logger: The logger to use.
        post_evaluation_hook: A hook to run after the evaluation.
        overwrite_optimization_dir: Whether to overwrite the optimization directory.
        pre_load_hooks: Hooks to run before loading the results.
    """
    # NOTE(eddiebergman): This was deprecated a while ago and called at
    # evaluate, now we just crash immediatly instead. Should probably
    # promote this check closer to the user, i.e. `neps.run()`
    evaluation_fn_params = inspect.signature(evaluation_fn).parameters
    if "previous_working_directory" in evaluation_fn_params:
        raise RuntimeError(
            "the argument: 'previous_working_directory' was deprecated. "
            f"In the function: '{evaluation_fn.__name__}', please,  "
            "use 'previous_pipeline_directory' instead. ",
        )
    if "working_directory" in evaluation_fn_params:
        raise RuntimeError(
            "the argument: 'working_directory' was deprecated. "
            f"In the function: '{evaluation_fn.__name__}', please,  "
            "use 'pipeline_directory' instead. ",
        )

    if logger is None:
        logger = logging.getLogger("neps")

    optimization_dir = Path(optimization_dir)

    # TODO(eddiebergman): Not sure how overwriting works with multiple workers....
    if overwrite_optimization_dir and optimization_dir.exists():
        logger.warning("Overwriting working_directory")
        shutil.rmtree(optimization_dir)

    shared_state = SharedState(optimization_dir, create_dirs=True)

    _poll = float(os.environ.get(ENVIRON_STATE_POLL_KEY, DEFAULT_STATE_POLL))
    _timeout = os.environ.get(ENVIRON_STATE_TIMEOUT_KEY, DEFAULT_STATE_TIMEOUT)
    timeout = float(_timeout) if _timeout is not None else None

    with shared_state.lock(poll=_poll, timeout=timeout):
        if not shared_state.optimizer_info_file.exists():
            serialize(optimizer_info, shared_state.optimizer_info_file, sort_keys=False)
        else:
            shared_state.check_optimizer_info_on_disk_matches(optimizer_info)

    evaluations_in_this_run = 0
    while True:
        if (
            max_evaluations_per_run is not None
            and evaluations_in_this_run >= max_evaluations_per_run
        ):
            logger.info("Maximum evaluations per run is reached, shutting down")
            break

        with shared_state.lock(poll=_poll, timeout=timeout):
            refs = shared_state.trial_refs()

            _try_remove_corrupted_configs(refs[Trial.State.CORRUPTED], logger)

            if not _worker_should_continue(
                max_evaluations_total,
                continue_until_max_evaluation_completed=continue_until_max_evaluation_completed,
                refs=refs,
                logger=logger,
            ):
                logger.info("Maximum total evaluations is reached, shutting down")
                break

            # TODO(eddiebergman): I assume we should skip sampling and just go evaluate
            # pending configs?
            if any(refs[Trial.State.PENDING]):
                logger.warning(
                    f"There are {len(refs[Trial.State.PENDING])} configs that"
                    " were sampled, but have no worker assigned. Sometimes this is due to"
                    " a delay in the filesystem communication, but most likely some"
                    " configs crashed during their execution or a jobtime-limit was"
                    "  reached.",
                )

            # While we have the decision lock, we will now sample with the optimizer in
            # this process
            with sampler.using_state(shared_state.optimizer_state_file):
                if sampler.budget is not None and sampler.used_budget >= sampler.budget:
                    logger.info("Maximum budget reached, shutting down")
                    break

                logger.debug("Sampling a new configuration")
                if pre_load_hooks is not None:
                    for hook in pre_load_hooks:
                        sampler = hook(sampler)

                sampler.load_results(
                    previous_results={
                        ref.id: ref.to_result(config_transform=sampler.load_config)
                        for ref in refs[Trial.State.COMPLETE]
                    },
                    pending_evaluations={
                        ref.id: sampler.load_config(ref.config())
                        for ref in refs[Trial.State.IN_PROGRESS]
                    },
                )

                # TODO(eddiebergman): If we have some unified `Trial` like object,
                # we can just have them return this instead.
                config, config_id, prev_config_id = sampler.get_config_and_ids()

            trial = Trial(
                id=config_id,
                config=config,
                pipeline_dir=shared_state.results_dir / f"config_{config_id}",
                prev_config_id=prev_config_id,
                metadata={"time_sampled": time.time()},
            )
            trial.write_to_disk()
            logger.debug(f"Sampled config {config_id}")

            # Inform the global state of this process that we are evaluating this trial
            _set_in_progress_trial(trial)

        # Obtain the lock on this trial and evaluate it,
        # otherwise continue back to waiting to sampling
        with trial.disk.lock.try_lock() as acquired:
            if not acquired:
                continue

            # NOTE: Bit of an extra safety check but check that the trial is not complete
            if trial.disk.state == Trial.State.COMPLETE:
                continue

            result, time_end = _evaluate_config(trial, evaluation_fn, logger)
            meta: dict[str, Any] = {"time_end": time_end}

            # If this is set, it means we update the optimzier with the used
            # budget once we write the trial to disk and mark it as complete
            account_for_cost: bool = False
            eval_cost: float | None = None

            if result == "error":
                # TODO(eddiebergman): We should probably do something here...
                pass
            elif "cost" not in result and sampler.budget is not None:
                raise ValueError(
                    "The evaluation function result should contain "
                    f"a 'cost' field when used with a budget. Got {result}",
                )
            elif "cost" in result:
                eval_cost = float(result["cost"])
                account_for_cost = result.get("account_for_cost", True)
                meta["budget"] = {
                    "max": sampler.budget,
                    "used": sampler.used_budget,
                    "eval_cost": eval_cost,
                    "account_for_cost": account_for_cost,
                }

            trial.results = result
            trial.metadata.update(meta)

            with shared_state.lock(poll=_poll, timeout=timeout):
                trial.write_to_disk()
                if account_for_cost:
                    assert eval_cost is not None
                    with sampler.using_state(shared_state.optimizer_state_file):
                        sampler.used_budget += eval_cost

            # 3. Anything the user might want to do after the evaluation
            if post_evaluation_hook is not None:
                post_evaluation_hook(
                    trial.config,
                    trial.id,
                    trial.pipeline_dir,
                    trial.results,
                    logger,
                )

            logger.info(f"Finished evaluating config {config_id}")

            evaluations_in_this_run += 1