Skip to content

Runtime

TODO.

DefaultWorker dataclass #

DefaultWorker(
    state: NePSState,
    settings: WorkerSettings,
    evaluation_fn: Callable[..., EvaluatePipelineReturn],
    optimizer: AskFunction,
    worker_id: str,
    worker_cumulative_eval_count: int = 0,
    worker_cumulative_eval_cost: float = 0.0,
    worker_cumulative_evaluation_time_seconds: float = 0.0,
    _GRACE: ClassVar = FS_SYNC_GRACE_BASE,
)

A default worker for the NePS system.

This is the worker that is used by default in the neps.run() loop.

evaluation_fn instance-attribute #

evaluation_fn: Callable[..., EvaluatePipelineReturn]

The evaluation function to use for the worker.

optimizer instance-attribute #

optimizer: AskFunction

The optimizer that is in use by the worker.

settings instance-attribute #

settings: WorkerSettings

The settings for the worker.

state instance-attribute #

state: NePSState

The state of the NePS system.

worker_cumulative_eval_cost class-attribute instance-attribute #

worker_cumulative_eval_cost: float = 0.0

The cost of the evaluations done by this worker.

worker_cumulative_eval_count class-attribute instance-attribute #

worker_cumulative_eval_count: int = 0

The number of evaluations done by this worker.

worker_cumulative_evaluation_time_seconds class-attribute instance-attribute #

worker_cumulative_evaluation_time_seconds: float = 0.0

The time spent evaluating configurations by this worker.

worker_id instance-attribute #

worker_id: str

The id of the worker.

new classmethod #

new(
    *,
    state: NePSState,
    optimizer: AskFunction,
    settings: WorkerSettings,
    evaluation_fn: Callable[..., EvaluatePipelineReturn],
    worker_id: str | None = None
) -> DefaultWorker

Create a new worker.

Source code in neps\runtime.py
@classmethod
def new(
    cls,
    *,
    state: NePSState,
    optimizer: AskFunction,
    settings: WorkerSettings,
    evaluation_fn: Callable[..., EvaluatePipelineReturn],
    worker_id: str | None = None,
) -> DefaultWorker:
    """Create a new worker."""
    return DefaultWorker(
        state=state,
        optimizer=optimizer,
        settings=settings,
        evaluation_fn=evaluation_fn,
        worker_id=worker_id if worker_id is not None else _default_worker_name(),
    )

run #

run() -> None

Run the worker.

Will keep running until one of the criterion defined by the WorkerSettings is met.

Source code in neps\runtime.py
def run(self) -> None:  # noqa: C901, PLR0912, PLR0915
    """Run the worker.

    Will keep running until one of the criterion defined by the `WorkerSettings`
    is met.
    """
    _set_workers_neps_state(self.state)

    logger.info("Launching NePS")

    _time_monotonic_start = time.monotonic()
    _error_from_evaluation: Exception | None = None

    _repeated_fail_get_next_trial_count = 0
    n_repeated_failed_check_should_stop = 0
    while True:
        try:
            # First check local worker settings
            should_stop = self._check_worker_local_settings(
                time_monotonic_start=_time_monotonic_start,
                error_from_this_worker=_error_from_evaluation,
            )
            if should_stop is not False:
                logger.info(should_stop)
                break

            # Next check global errs having occured
            should_stop = self._check_shared_error_stopping_criterion()
            if should_stop is not False:
                logger.info(should_stop)
                break

        except WorkerRaiseError as e:
            # If we raise a specific error, we should stop the worker
            raise e
        except Exception as e:
            # An unknown exception, check our retry countk
            n_repeated_failed_check_should_stop += 1
            if (
                n_repeated_failed_check_should_stop
                >= MAX_RETRIES_WORKER_CHECK_SHOULD_STOP
            ):
                raise WorkerRaiseError(
                    f"Worker {self.worker_id} failed to check if it should stop"
                    f" {MAX_RETRIES_WORKER_CHECK_SHOULD_STOP} times in a row. Bailing"
                ) from e

            logger.error(
                "Unexpected error from worker '%s' while checking if it should stop.",
                self.worker_id,
                exc_info=True,
            )
            time.sleep(1)  # Help stagger retries
            continue

        # From here, we now begin sampling or getting the next pending trial.
        # As the global stopping criterion requires us to check all trials, and
        # needs to be in locked in-step with sampling and is done inside
        # _get_next_trial
        try:
            trial_to_eval = self._get_next_trial()
            if trial_to_eval == "break":
                break
            _repeated_fail_get_next_trial_count = 0
        except Exception as e:
            _repeated_fail_get_next_trial_count += 1
            if isinstance(e, portalocker.exceptions.LockException):
                logger.debug(
                    "Worker '%s': Timeout while trying to get the next trial to"
                    " evaluate. If you are using a model based optimizer, such as"
                    " Bayesian Optimization, this can occur as the number of"
                    " configurations get large. There's not much to do here"
                    " and we will retry to obtain the lock.",
                    self.worker_id,
                    exc_info=True,
                )
            else:
                logger.debug(
                    "Worker '%s': Error while trying to get the next trial to"
                    " evaluate.",
                    self.worker_id,
                    exc_info=True,
                )
                time.sleep(1)  # Help stagger retries
            # NOTE: This is to prevent any infinite loops if we can't get a trial
            if _repeated_fail_get_next_trial_count >= MAX_RETRIES_GET_NEXT_TRIAL:
                raise WorkerFailedToGetPendingTrialsError(
                    f"Worker {self.worker_id} failed to get pending trials"
                    f" {MAX_RETRIES_GET_NEXT_TRIAL} times in"
                    " a row. Bailing!"
                ) from e

            continue

        # We (this worker) has managed to set it to evaluating, now we can evaluate it
        with _set_global_trial(trial_to_eval):
            evaluated_trial, report = evaluate_trial(
                trial=trial_to_eval,
                evaluation_fn=self.evaluation_fn,
                default_report_values=self.settings.default_report_values,
            )
            evaluation_duration = evaluated_trial.metadata.evaluation_duration
            assert evaluation_duration is not None
            self.worker_cumulative_evaluation_time_seconds += evaluation_duration

        self.worker_cumulative_eval_count += 1

        logger.info(
            "Worker '%s' evaluated trial: %s as %s.",
            self.worker_id,
            evaluated_trial.id,
            evaluated_trial.metadata.state,
        )

        if report.cost is not None:
            self.worker_cumulative_eval_cost += report.cost

        if report.err is not None:
            logger.error(
                f"Error during evaluation of '{evaluated_trial.id}'"
                f" : {evaluated_trial.config}."
            )
            logger.exception(report.err)
            _error_from_evaluation = report.err

        # We do not retry this, as if some other worker has
        # managed to manipulate this trial in the meantime,
        # then something has gone wrong
        with self.state._trial_lock.lock(worker_id=self.worker_id):
            self.state._report_trial_evaluation(
                trial=evaluated_trial,
                report=report,
                worker_id=self.worker_id,
            )
            # This is mostly for `tblogger`
            for _key, callback in _TRIAL_END_CALLBACKS.items():
                callback(trial_to_eval)

        logger.debug("Config %s: %s", evaluated_trial.id, evaluated_trial.config)
        logger.debug("Loss %s: %s", evaluated_trial.id, report.objective_to_minimize)
        logger.debug("Cost %s: %s", evaluated_trial.id, report.objective_to_minimize)
        logger.debug(
            "Learning Curve %s: %s", evaluated_trial.id, report.learning_curve
        )

get_in_progress_trial #

get_in_progress_trial() -> Trial

Get the currently running trial in this process.

Source code in neps\runtime.py
def get_in_progress_trial() -> Trial:
    """Get the currently running trial in this process."""
    if _CURRENTLY_RUNNING_TRIAL_IN_PROCESS is None:
        raise RuntimeError(
            "The worker's NePS state has not been set! This should only be called"
            " from within a `evaluate_pipeline` context. If you are not running a"
            " pipeline and you did not call this function (`get_workers_neps_state`)"
            " yourself, this is a bug and should be reported to NePS."
        )
    return _CURRENTLY_RUNNING_TRIAL_IN_PROCESS

get_workers_neps_state #

get_workers_neps_state() -> NePSState

Get the worker's NePS state.

Source code in neps\runtime.py
def get_workers_neps_state() -> NePSState:
    """Get the worker's NePS state."""
    if _WORKER_NEPS_STATE is None:
        raise RuntimeError(
            "The worker's NePS state has not been set! This should only be called"
            " from within a `evaluate_pipeline` context. If you are not running a"
            " pipeline and you did not call this function (`get_workers_neps_state`)"
            " yourself, this is a bug and should be reported to NePS."
        )
    return _WORKER_NEPS_STATE

register_notify_trial_end #

register_notify_trial_end(
    key: str, callback: Callable[[Trial], None]
) -> None

Register a callback to be called when a trial ends.

Source code in neps\runtime.py
def register_notify_trial_end(key: str, callback: Callable[[Trial], None]) -> None:
    """Register a callback to be called when a trial ends."""
    _TRIAL_END_CALLBACKS[key] = callback