Skip to content

Neps state

The main state object that holds all the shared state objects.

This object is used to interact with the shared state objects in a safe atomic manner, such that each worker can create an identical NePSState and interact with it without having to worry about locking or out-dated information.

For an actual instantiation of this object, see create_or_load_filebased_neps_state().

NePSState dataclass #

NePSState(
    path: Path,
    _trial_lock: FileLocker,
    _trial_repo: TrialRepo,
    _optimizer_lock: FileLocker,
    _optimizer_info_path: Path,
    _optimizer_info: OptimizerInfo,
    _optimizer_state_path: Path,
    _optimizer_state: OptimizationState,
    _err_lock: FileLocker,
    _shared_errors_path: Path,
    _shared_errors: ErrDump,
)

The main state object that holds all the shared state objects.

all_trial_ids #

all_trial_ids() -> list[str]

Get all the trial ids.

Source code in neps/state/neps_state.py
def all_trial_ids(self) -> list[str]:
    """Get all the trial ids."""
    return self._trial_repo.list_trial_ids()

create_or_load classmethod #

create_or_load(
    path: Path,
    *,
    load_only: bool = False,
    optimizer_info: OptimizerInfo | None = None,
    optimizer_state: OptimizationState | None = None
) -> NePSState

Create a new NePSState in a directory or load the existing one if it already exists, depending on the argument.

Warning

We check that the optimizer info in the NePSState on disk matches the one that is passed. However we do not lock this check so it is possible that if two processes try to create a NePSState at the same time, both with different optimizer infos, that one will fail to create the NePSState. This is a limitation of the current design.

In principal, we could allow multiple optimizers to be run and share the same set of trials.

PARAMETER DESCRIPTION
path

The directory to create the state in.

TYPE: Path

load_only

If True, only load the state and do not create a new one.

TYPE: bool DEFAULT: False

optimizer_info

The optimizer info to use.

TYPE: OptimizerInfo | None DEFAULT: None

optimizer_state

The optimizer state to use.

TYPE: OptimizationState | None DEFAULT: None

RETURNS DESCRIPTION
NePSState

The NePSState.

RAISES DESCRIPTION
NePSError

If the optimizer info on disk does not match the one provided.

Source code in neps/state/neps_state.py
@classmethod
def create_or_load(
    cls,
    path: Path,
    *,
    load_only: bool = False,
    optimizer_info: OptimizerInfo | None = None,
    optimizer_state: OptimizationState | None = None,
) -> NePSState:
    """Create a new NePSState in a directory or load the existing one
    if it already exists, depending on the argument.

    !!! warning

        We check that the optimizer info in the NePSState on disk matches
        the one that is passed. However we do not lock this check so it
        is possible that if two processes try to create a NePSState at the
        same time, both with different optimizer infos, that one will fail
        to create the NePSState. This is a limitation of the current design.

        In principal, we could allow multiple optimizers to be run and share
        the same set of trials.

    Args:
        path: The directory to create the state in.
        load_only: If True, only load the state and do not create a new one.
        optimizer_info: The optimizer info to use.
        optimizer_state: The optimizer state to use.

    Returns:
        The NePSState.

    Raises:
        NePSError: If the optimizer info on disk does not match the one provided.
    """
    path = path.absolute().resolve()
    is_new = not path.exists()
    if load_only:
        if is_new:
            raise FileNotFoundError(f"No NePSState found at '{path}'.")
    else:
        assert optimizer_info is not None
        assert optimizer_state is not None

    path.mkdir(parents=True, exist_ok=True)
    config_dir = path / "configs"
    config_dir.mkdir(parents=True, exist_ok=True)

    optimizer_info_path = path / "optimizer_info.yaml"
    optimizer_state_path = path / "optimizer_state.pkl"
    shared_errors_path = path / "shared_errors.jsonl"

    # We have to do one bit of sanity checking to ensure that the optimzier
    # info on disk manages the one we have recieved, otherwise we are unsure which
    # optimizer is being used.
    # NOTE: We assume that we do not have to worry about a race condition
    # here where we have two different NePSState objects with two different optimizer
    # infos trying to be created at the same time. This avoids the need to lock to
    # check the optimizer info. If this assumption changes, then we would have
    # to first lock before we do this check
    if not is_new:
        existing_info = _deserialize_optimizer_info(optimizer_info_path)
        if not load_only and existing_info != optimizer_info:
            raise NePSError(
                "The optimizer info on disk does not match the one provided."
                f"\nOn disk: {existing_info}\nProvided: {optimizer_info}"
                f"\n\nLoaded the one on disk from {path}."
            )
        with optimizer_state_path.open("rb") as f:
            optimizer_state = pickle.load(f)  # noqa: S301

        optimizer_info = existing_info
        error_dump = ReaderWriterErrDump.read(shared_errors_path)
    else:
        assert optimizer_info is not None
        assert optimizer_state is not None

        serialize(optimizer_info, path=optimizer_info_path)
        with optimizer_state_path.open("wb") as f:
            pickle.dump(optimizer_state, f, protocol=pickle.HIGHEST_PROTOCOL)

        error_dump = ErrDump([])

    return NePSState(
        path=path,
        _trial_repo=TrialRepo(config_dir),
        # Locks,
        _trial_lock=FileLocker(
            lock_path=path / ".configs.lock",
            poll=TRIAL_FILELOCK_POLL,
            timeout=TRIAL_FILELOCK_TIMEOUT,
        ),
        _optimizer_lock=FileLocker(
            lock_path=path / ".optimizer.lock",
            poll=STATE_FILELOCK_POLL,
            timeout=STATE_FILELOCK_TIMEOUT,
        ),
        _err_lock=FileLocker(
            lock_path=path / ".errors.lock",
            poll=GLOBAL_ERR_FILELOCK_POLL,
            timeout=GLOBAL_ERR_FILELOCK_TIMEOUT,
        ),
        # State
        _optimizer_info_path=optimizer_info_path,
        _optimizer_info=optimizer_info,
        _optimizer_state_path=optimizer_state_path,
        _optimizer_state=optimizer_state,  # type: ignore
        _shared_errors_path=shared_errors_path,
        _shared_errors=error_dump,
    )

lock_and_get_current_evaluating_trials #

lock_and_get_current_evaluating_trials() -> list[Trial]

Get the current evaluating trials.

Source code in neps/state/neps_state.py
def lock_and_get_current_evaluating_trials(self) -> list[Trial]:
    """Get the current evaluating trials."""
    with self._trial_lock.lock():
        trials = self._trial_repo.latest()
        return [
            trial
            for trial in trials.values()
            if trial.metadata.state == Trial.State.EVALUATING
        ]

lock_and_get_errors #

lock_and_get_errors() -> ErrDump

Get all the errors that have occurred during the optimization.

Source code in neps/state/neps_state.py
def lock_and_get_errors(self) -> ErrDump:
    """Get all the errors that have occurred during the optimization."""
    with self._err_lock.lock():
        return ReaderWriterErrDump.read(self._shared_errors_path)

lock_and_get_next_pending_trial #

lock_and_get_next_pending_trial() -> Trial | None
lock_and_get_next_pending_trial(n: int) -> list[Trial]
lock_and_get_next_pending_trial(
    n: int | None = None,
) -> Trial | list[Trial] | None

Get the next pending trial.

Source code in neps/state/neps_state.py
def lock_and_get_next_pending_trial(
    self,
    n: int | None = None,
) -> Trial | list[Trial] | None:
    """Get the next pending trial."""
    with self._trial_lock.lock():
        trials = self._trial_repo.latest()
        pendings = sorted(
            [
                trial
                for trial in trials.values()
                if trial.metadata.state == Trial.State.PENDING
            ],
            key=lambda t: t.metadata.time_sampled,
        )
        if n is None:
            return pendings[0] if pendings else None
        return pendings[:n]

lock_and_get_optimizer_info #

lock_and_get_optimizer_info() -> OptimizerInfo

Get the optimizer information.

Source code in neps/state/neps_state.py
def lock_and_get_optimizer_info(self) -> OptimizerInfo:
    """Get the optimizer information."""
    with self._optimizer_lock.lock():
        return _deserialize_optimizer_info(self._optimizer_info_path)

lock_and_get_optimizer_state #

lock_and_get_optimizer_state() -> OptimizationState

Get the optimizer state.

Source code in neps/state/neps_state.py
def lock_and_get_optimizer_state(self) -> OptimizationState:
    """Get the optimizer state."""
    with self._optimizer_lock.lock():  # noqa: SIM117
        with self._optimizer_state_path.open("rb") as f:
            obj = pickle.load(f)  # noqa: S301
            assert isinstance(obj, OptimizationState)
            return obj

lock_and_get_trial_by_id #

lock_and_get_trial_by_id(trial_id: str) -> Trial

Get a trial by its id.

Source code in neps/state/neps_state.py
def lock_and_get_trial_by_id(self, trial_id: str) -> Trial:
    """Get a trial by its id."""
    with self._trial_lock.lock():
        return self._trial_repo.load_trial_from_disk(trial_id)

lock_and_read_trials #

lock_and_read_trials() -> dict[str, Trial]

Acquire the state lock and read the trials.

Source code in neps/state/neps_state.py
def lock_and_read_trials(self) -> dict[str, Trial]:
    """Acquire the state lock and read the trials."""
    with self._trial_lock.lock():
        return self._trial_repo.latest()

lock_and_report_trial_evaluation #

lock_and_report_trial_evaluation(
    trial: Trial, report: Report, *, worker_id: str
) -> None

Acquire the state lock and report the trial evaluation.

Source code in neps/state/neps_state.py
def lock_and_report_trial_evaluation(
    self,
    trial: Trial,
    report: Report,
    *,
    worker_id: str,
) -> None:
    """Acquire the state lock and report the trial evaluation."""
    with self._trial_lock.lock(), self._err_lock.lock():
        self._report_trial_evaluation(trial, report, worker_id=worker_id)

lock_and_sample_trial #

lock_and_sample_trial(
    optimizer: AskFunction,
    *,
    worker_id: str,
    n: None = None
) -> Trial
lock_and_sample_trial(
    optimizer: AskFunction, *, worker_id: str, n: int
) -> list[Trial]
lock_and_sample_trial(
    optimizer: AskFunction,
    *,
    worker_id: str,
    n: int | None = None
) -> Trial | list[Trial]

Acquire the state lock and sample a trial.

Source code in neps/state/neps_state.py
def lock_and_sample_trial(
    self, optimizer: AskFunction, *, worker_id: str, n: int | None = None
) -> Trial | list[Trial]:
    """Acquire the state lock and sample a trial."""
    with self._optimizer_lock.lock():
        with self._trial_lock.lock():
            trials_ = self._trial_repo.latest()

        trials = self._sample_trial(
            optimizer,
            trials=trials_,
            worker_id=worker_id,
            n=n,
        )

        with self._trial_lock.lock():
            self._trial_repo.store_new_trial(trials)

        return trials

put_updated_trial #

put_updated_trial(
    trial: Trial,
    *,
    hints: (
        list[TrialWriteHint] | TrialWriteHint | None
    ) = None
) -> None

Update the trial.

PARAMETER DESCRIPTION
trial

The trial to update.

TYPE: Trial

hints

The hints to use when updating the trial. Defines what files need to be updated. If you don't know, leave None, this is a micro-optimization.

TYPE: list[TrialWriteHint] | TrialWriteHint | None DEFAULT: None

Source code in neps/state/neps_state.py
def put_updated_trial(
    self,
    trial: Trial,
    *,
    hints: list[TrialWriteHint] | TrialWriteHint | None = None,
) -> None:
    """Update the trial.

    Args:
        trial: The trial to update.
        hints: The hints to use when updating the trial. Defines what files need
            to be updated.
            If you don't know, leave `None`, this is a micro-optimization.
    """
    with self._trial_lock.lock():
        self._trial_repo.update_trial(trial, hints=hints)

unsafe_retry_get_trial_by_id #

unsafe_retry_get_trial_by_id(trial_id: str) -> Trial

Get a trial by id but use unsafe retries.

Source code in neps/state/neps_state.py
def unsafe_retry_get_trial_by_id(self, trial_id: str) -> Trial:
    """Get a trial by id but use unsafe retries."""
    for _ in range(N_UNSAFE_RETRIES):
        try:
            return self._trial_repo.load_trial_from_disk(trial_id)
        except TrialNotFoundError as e:
            raise e
        except Exception as e:  # noqa: BLE001
            logger.warning(
                "Failed to get trial '%s' due to an error: %s", trial_id, e
            )
            time.sleep(0.1)
            continue

    raise NePSError(
        f"Failed to get trial '{trial_id}' after {N_UNSAFE_RETRIES} retries."
    )

TrialRepo dataclass #

TrialRepo(directory: Path)

A repository for trials that are stored on disk.

Warning

This class does not implement locking and it is up to the caller to ensure there are no race conflicts.

latest #

latest() -> dict[str, Trial]

Get the latest trials from the cache.

Source code in neps/state/neps_state.py
def latest(self) -> dict[str, Trial]:
    """Get the latest trials from the cache."""
    if not self.cache_path.exists():
        # If we end up with no cache but there are trials on disk, we need to read in.
        if any(path.name.startswith("config_") for path in self.directory.iterdir()):
            trial_ids = self.list_trial_ids()
            trials = {
                trial_id: self.load_trial_from_disk(trial_id)
                for trial_id in trial_ids
            }
            pickle_bytes = pickle.dumps(trials, protocol=pickle.HIGHEST_PROTOCOL)
            with atomic_write(self.cache_path, "wb") as f:
                f.write(pickle_bytes)

        return {}

    return self._read_pkl_and_maybe_consolidate()

list_trial_ids #

list_trial_ids() -> list[str]

List all the trial ids on disk.

Source code in neps/state/neps_state.py
def list_trial_ids(self) -> list[str]:
    """List all the trial ids on disk."""
    return [
        config_path.name[CONFIG_PREFIX_LEN:]
        for config_path in self.directory.iterdir()
        if config_path.name.startswith("config_") and config_path.is_dir()
    ]

load_trial_from_disk #

load_trial_from_disk(trial_id: str) -> Trial

Load a trial from disk.

RAISES DESCRIPTION
TrialNotFoundError

If the trial is not found on disk.

Source code in neps/state/neps_state.py
def load_trial_from_disk(self, trial_id: str) -> Trial:
    """Load a trial from disk.

    Raises:
        TrialNotFoundError: If the trial is not found on disk.
    """
    config_path = self.directory / f"config_{trial_id}"
    if not config_path.exists():
        raise TrialNotFoundError(
            f"Trial {trial_id} not found at expected path of {config_path}."
        )

    return ReaderWriterTrial.read(config_path)

store_new_trial #

store_new_trial(trial: Trial | list[Trial]) -> None

Write a new trial to disk.

RAISES DESCRIPTION
TrialAlreadyExistsError

If the trial already exists on disk.

Source code in neps/state/neps_state.py
def store_new_trial(self, trial: Trial | list[Trial]) -> None:
    """Write a new trial to disk.

    Raises:
        TrialAlreadyExistsError: If the trial already exists on disk.
    """
    if isinstance(trial, Trial):
        config_path = self.directory / f"config_{trial.id}"
        if config_path.exists():
            raise TrialAlreadyExistsError(trial.id, config_path)

        bytes_ = pickle.dumps(trial, protocol=pickle.HIGHEST_PROTOCOL)
        with atomic_write(self.cache_path, "ab") as f:
            f.write(bytes_)

        config_path.mkdir(parents=True, exist_ok=True)
        ReaderWriterTrial.write(
            trial,
            self.directory / f"config_{trial.id}",
            hints=["config", "metadata"],
        )
    else:
        for child_trial in trial:
            config_path = self.directory / f"config_{child_trial.id}"
            if config_path.exists():
                raise TrialAlreadyExistsError(child_trial.id, config_path)
            config_path.mkdir(parents=True, exist_ok=True)

        bytes_ = pickle.dumps(trial, protocol=pickle.HIGHEST_PROTOCOL)
        with atomic_write(self.cache_path, "ab") as f:
            f.write(bytes_)

        for child_trial in trial:
            ReaderWriterTrial.write(
                child_trial,
                self.directory / f"config_{child_trial.id}",
                hints=["config", "metadata"],
            )

update_trial #

update_trial(
    trial: Trial,
    *,
    hints: (
        Iterable[TrialWriteHint] | TrialWriteHint | None
    ) = ("report", "metadata")
) -> None

Update a trial on disk.

PARAMETER DESCRIPTION
trial

The trial to update.

TYPE: Trial

hints

The hints to use when updating the trial. Defines what files need to be updated. If you don't know, leave None, this is a micro-optimization.

TYPE: Iterable[TrialWriteHint] | TrialWriteHint | None DEFAULT: ('report', 'metadata')

Source code in neps/state/neps_state.py
def update_trial(
    self,
    trial: Trial,
    *,
    hints: Iterable[TrialWriteHint] | TrialWriteHint | None = ("report", "metadata"),
) -> None:
    """Update a trial on disk.

    Args:
        trial: The trial to update.
        hints: The hints to use when updating the trial. Defines what files need
            to be updated.
            If you don't know, leave `None`, this is a micro-optimization.
    """
    bytes_ = pickle.dumps(trial, protocol=pickle.HIGHEST_PROTOCOL)
    with atomic_write(self.cache_path, "ab") as f:
        f.write(bytes_)

    ReaderWriterTrial.write(trial, self.directory / f"config_{trial.id}", hints=hints)