Skip to content

Status

Functions to get the status of a run and save the status to CSV files.

This module provides utilities for monitoring NePS optimization runs.

Summary dataclass #

Summary(
    by_state: dict[State, list[Trial]],
    best: tuple[Trial, float] | None,
    is_multiobjective: bool,
)

Summary of the current state of a neps run.

num_errors property #

num_errors: int

Number of trials that have errored.

num_evaluated property #

num_evaluated: int

Number of trials that have been evaluated.

num_pending property #

num_pending: int

Number of trials that are pending.

completed #

completed() -> list[Trial]

Return all trials which are in a completed state.

Source code in neps/status/status.py
def completed(self) -> list[Trial]:
    """Return all trials which are in a completed state."""
    return list(
        itertools.chain(
            self.by_state[State.SUCCESS],
            self.by_state[State.FAILED],
            self.by_state[State.CRASHED],
        )
    )

df #

df() -> DataFrame

Convert the summary into a dataframe.

Source code in neps/status/status.py
def df(self) -> pd.DataFrame:
    """Convert the summary into a dataframe."""
    trials = sorted(
        itertools.chain(*self.by_state.values()),
        key=lambda t: t.metadata.time_sampled,
    )
    if len(trials) == 0:
        return pd.DataFrame()

    # Config dataframe, config columns prefixed with `config.`
    config_df = (
        pd.DataFrame.from_records([trial.config for trial in trials])
        .rename(columns=lambda name: f"config.{name}")
        .convert_dtypes()
    )

    # Report dataframe
    report_df = pd.DataFrame.from_records(
        [asdict(t.report) if t.report is not None else {} for t in trials]
    ).convert_dtypes()

    extra_df = pd.DataFrame()
    # We pop out the user extra column to flatten it
    if "extra" in report_df.columns:
        extra_column = report_df.pop("extra")
        extra_df = pd.json_normalize(extra_column).rename(  # type: ignore
            columns=lambda name: f"extra.{name}"
        )

    # Metadata dataframe
    metadata_df = pd.DataFrame.from_records(
        [asdict(t.metadata) for t in trials]
    ).convert_dtypes()
    combined_df = pd.concat(
        [config_df, extra_df, report_df, metadata_df], axis="columns"
    )
    if combined_df.empty:
        return combined_df
    return combined_df.set_index("id").dropna(how="all", axis="columns")

formatted #

formatted(
    pipeline_space: (
        PipelineSpace | SearchSpace | None
    ) = None,
) -> str

Return a formatted string of the summary.

PARAMETER DESCRIPTION
pipeline_space

Optional PipelineSpace for the run. If provided, it is used to format the best config in a more readable way. This is typically auto-loaded from disk by the status() function.

TYPE: PipelineSpace | SearchSpace | None DEFAULT: None

RETURNS DESCRIPTION
str

A formatted string of the summary.

Source code in neps/status/status.py
def formatted(  # noqa: PLR0912
    self, pipeline_space: PipelineSpace | SearchSpace | None = None
) -> str:
    """Return a formatted string of the summary.

    Args:
        pipeline_space: Optional PipelineSpace for the run. If provided, it is used
            to format the best config in a more readable way. This is typically
            auto-loaded from disk by the status() function.

    Returns:
        A formatted string of the summary.
    """
    state_summary = "\n".join(
        f"    {state.name.lower()}: {len(trials)}"
        for state, trials in self.by_state.items()
        if len(trials) > 0
    )

    if self.best is None:
        if self.is_multiobjective:
            best_summary = "Multiobjective summary not supported yet for best yet."
        else:
            best_summary = "No best found yet."
    else:
        best_trial, best_objective_to_minimize = self.best

        # Format config based on whether pipeline_space_variables is provided

        best_summary = (
            f"# Best Found (config {best_trial.metadata.id}):"
            "\n"
            f"\n    objective_to_minimize: {best_objective_to_minimize}\n    config: "
        )
        if not pipeline_space:
            # Pretty-print dict configs with proper indentation
            config_str = pformat(
                best_trial.config, indent=2, width=80, sort_dicts=False
            )
            # Add indentation to each line for alignment
            indented_config = "\n        ".join(config_str.split("\n"))
            best_summary += f"\n        {indented_config}"
        elif isinstance(pipeline_space, PipelineSpace):
            # Only PipelineSpace supports pretty formatting - SearchSpace doesn't
            best_config_resolve = NepsCompatConverter().from_neps_config(
                best_trial.config
            )
            pipeline_configs = []
            variables = list(pipeline_space.get_attrs().keys()) + list(
                pipeline_space.fidelity_attrs.keys()
            )
            resolved_pipeline = neps_space.resolve(
                pipeline_space,
                OnlyPredefinedValuesSampler(best_config_resolve.predefined_samplings),
                environment_values=best_config_resolve.environment_values,
            )[0]

            for variable in variables:
                operation = getattr(resolved_pipeline, variable)
                pipeline_configs.append(format_value(operation))

            for n_pipeline, pipeline_config in enumerate(pipeline_configs):
                formatted_config = str(pipeline_config)
                variable_name = variables[n_pipeline]

                # Multi-line configs: put on new line with proper indentation
                # Single-line configs: inline after variable name
                if "\n" in formatted_config:
                    indented_config = "\n          ".join(
                        formatted_config.split("\n")
                    )
                    best_summary += (
                        f"\n        {variable_name}:\n          {indented_config}"
                    )
                else:
                    best_summary += f"\n        {variable_name}: {formatted_config}"
        else:
            # SearchSpace or other space type - pretty-print the dict
            config_str = pformat(
                best_trial.config, indent=2, width=80, sort_dicts=False
            )
            # Add indentation to each line for alignment
            indented_config = "\n        ".join(config_str.split("\n"))
            best_summary += f"\n        {indented_config}"

        best_summary += f"\n    path: {best_trial.metadata.location}"

        assert best_trial.report is not None
        if best_trial.report.cost is not None:
            best_summary += f"\n    cost: {best_trial.report.cost}"
        if len(best_trial.report.extra) > 0:
            best_summary += f"\n    extra: {best_trial.report.extra}"

    return f"# Configs: {self.num_evaluated}\n\n{state_summary}\n\n{best_summary}"

from_directory classmethod #

from_directory(root_directory: str | Path) -> Summary

Create a summary from a neps run directory.

Source code in neps/status/status.py
@classmethod
def from_directory(cls, root_directory: str | Path) -> Summary:
    """Create a summary from a neps run directory."""
    root_directory = Path(root_directory)
    # NOTE: We don't lock the shared state since we are just reading and don't need to
    # make decisions based on the state
    try:
        from neps.runtime import get_workers_neps_state

        shared_state = get_workers_neps_state()
    except RuntimeError:
        shared_state = NePSState.create_or_load(root_directory, load_only=True)

    trials = shared_state.lock_and_read_trials()

    return cls.from_trials(trials)

from_trials classmethod #

from_trials(trials: dict[str, Trial]) -> Summary

Summarize a mapping of trials into (by_state, is_multiobjective, best).

This extracts the core loop from Summary.from_directory so callers that already have a trials mapping can reuse the logic without re-reading state.

Source code in neps/status/status.py
@classmethod
def from_trials(cls, trials: dict[str, Trial]) -> Summary:
    """Summarize a mapping of trials into (by_state, is_multiobjective, best).

    This extracts the core loop from `Summary.from_directory` so callers that
    already have a `trials` mapping can reuse the logic without re-reading state.
    """
    is_multiobjective: bool = False
    best: tuple[Trial, float] | None = None
    by_state: dict[State, list[Trial]] = {s: [] for s in State}

    for trial in trials.values():
        state = trial.metadata.state
        by_state[state].append(trial)

        if trial.report is not None:
            objective_to_minimize = trial.report.objective_to_minimize
            match objective_to_minimize:
                case None:
                    pass
                case float() | int() | np.number() if not is_multiobjective:
                    if best is None or objective_to_minimize < best[1]:
                        best = (trial, objective_to_minimize)
                case Sequence():
                    is_multiobjective = True
                    best = None
                case _:
                    raise RuntimeError("Unexpected type for objective_to_minimize")

    return cls(by_state=by_state, best=best, is_multiobjective=is_multiobjective)

post_run_csv #

post_run_csv(
    root_directory: str | Path,
) -> tuple[Path, Path]

Create CSV files summarizing the run data.

PARAMETER DESCRIPTION
root_directory

The root directory of the NePS run.

TYPE: str | Path

RETURNS DESCRIPTION
tuple[Path, Path]

The paths to the configuration data CSV and the run data CSV.

Source code in neps/status/status.py
def post_run_csv(root_directory: str | Path) -> tuple[Path, Path]:
    """Create CSV files summarizing the run data.

    Args:
        root_directory: The root directory of the NePS run.

    Returns:
        The paths to the configuration data CSV and the run data CSV.
    """
    full_df, short = status(root_directory, print_summary=False)
    full_df_path, short_path, csv_locker = _initiate_summary_csv(root_directory)

    with csv_locker.lock():
        full_df.to_csv(full_df_path)
        short.to_frame().to_csv(short_path)

    return full_df_path, short_path

status #

status(
    root_directory: str | Path,
    *,
    print_summary: bool = False
) -> tuple[DataFrame, Series]

Print status information of a neps run and return results.

PARAMETER DESCRIPTION
root_directory

The root directory given to neps.run.

TYPE: str | Path

print_summary

If true, print a summary of the current run state.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple[DataFrame, Series]

Dataframe of full results and short summary series.

Source code in neps/status/status.py
def status(
    root_directory: str | Path,
    *,
    print_summary: bool = False,
) -> tuple[pd.DataFrame, pd.Series]:
    """Print status information of a neps run and return results.

    Args:
        root_directory: The root directory given to neps.run.
        print_summary: If true, print a summary of the current run state.

    Returns:
        Dataframe of full results and short summary series.
    """
    root_directory = Path(root_directory)

    # Try to load pipeline_space from disk for pretty printing
    pipeline_space = None
    if print_summary:
        from neps.api import load_pipeline_space

        with contextlib.suppress(FileNotFoundError, ValueError):
            pipeline_space = load_pipeline_space(root_directory)
            # Note: pipeline_space can still be None if it wasn't saved, which is fine

    summary = Summary.from_directory(root_directory)

    if print_summary:
        print(summary.formatted(pipeline_space=pipeline_space))

    df = summary.df()

    if len(df) == 0:
        return df, pd.Series()

    short = (
        df.groupby("state")
        .size()
        .rename(lambda name: f"num_{name.replace('State.', '').lower()}")
    )
    short.name = "value"
    short.index.name = "summary"
    short.index = short.index.astype(str)
    assert isinstance(short, pd.Series)

    # Not implemented for hypervolume -_-
    if summary.is_multiobjective:
        return df, short

    if "objective_to_minimize" not in df.columns:
        short["best_objective_to_minimize"] = None
        short["best_config_id"] = None
        return df, short

    idx_min = df["objective_to_minimize"].idxmin()
    row = df.loc[idx_min]
    assert isinstance(row, pd.Series)
    short["best_objective_to_minimize"] = row["objective_to_minimize"]
    short["best_config_id"] = row.name

    row = row.loc[row.index.str.startswith("config.")]
    row.index = row.index.str.replace("config.", "")  # type: ignore
    short = pd.concat([short, row])  # type: ignore
    assert isinstance(short, pd.Series)
    return df, short