Skip to content

Data loading

neps.utils.data_loading #

Utility functions for loading data from disk.

BestLossesDict #

Bases: TypedDict

Summary of the best losses over multiple seeds.

get_id_from_path #

get_id_from_path(path: str | Path | None) -> int | None

Extracts the id from the given path.

The id is the last part of the path, which is a multiple digit number.


I think this refers to task ids and not config ids!!!

Source code in neps/utils/
def get_id_from_path(path: str | Path | None) -> int | None:
    """Extracts the id from the given path.

    The id is the last part of the path, which is a multiple digit number.

        I think this refers to task ids and not config ids!!!
    if path is None:
        return None
    numbers = re.findall(r"\d+", str(path))
    if len(numbers) == 0:
        return None

    return int(numbers[-1])

is_valid_dev_path #

is_valid_dev_path(path: str | Path | None) -> bool

Checks if the given path is a valid path to development stages.

It follows the pattern task_00000/dev_00000, where 00000 is replaced by the task and development stage ids.

Source code in neps/utils/
def is_valid_dev_path(path: str | Path | None) -> bool:
    """Checks if the given path is a valid path to development stages.

    It follows the pattern task_00000/dev_00000, where 00000 is replaced by the
    task and development stage ids.
    if path is None:
        return False

    # TODO: Test for \ and | in the path, not only any non-alphanumerical character.
    #  Currently, false positives are possible.
    #  This regex expression does not work: ".*task_\d+[\/\\]dev_\d+"
    pattern = re.compile(r".*task_\d+\Wdev_\d+")
    return pattern.fullmatch(str(path)) is not None and Path(path).is_dir()

is_valid_seed_path #

is_valid_seed_path(path: str | Path | None) -> bool

Checks if the given path is a valid path to a seed.

It follows the pattern seed_00000, where 00000 is replaced by the seed.

Source code in neps/utils/
def is_valid_seed_path(path: str | Path | None) -> bool:
    """Checks if the given path is a valid path to a seed.

    It follows the pattern seed_00000, where 00000 is replaced by the seed.
    if path is None:
        return False
    path = Path(path)

    if not path.is_dir():
        return False


is_valid_task_path #

is_valid_task_path(path: str | Path | None) -> bool

Checks if the given path is a valid task path.

It follows the pattern task_00000, where 00000 is replaced by the task id.

Source code in neps/utils/
def is_valid_task_path(path: str | Path | None) -> bool:
    """Checks if the given path is a valid task path.

    It follows the pattern task_00000, where 00000 is replaced by the task id.
    if path is None:
        return False

    return (
        _VALID_TASK_PATH_PATTERN.fullmatch(str(path)) is not None and Path(path).is_dir()

read_tasks_and_dev_stages_from_disk #

    paths: list[str | Path],
) -> dict[int, dict[int, dict[str, ConfigResult]]]

Reads the given tasks and dev stages from the disk.


List of paths to the previous runs.

TYPE: list[str | Path]

dict[int, dict[int, dict[str, ConfigResult]]]

dict[task_id, dict[dev_stage, dict[config_id, ConfigResult]].

Source code in neps/utils/
def read_tasks_and_dev_stages_from_disk(
    paths: list[str | Path],
) -> dict[int, dict[int, dict[str, ConfigResult]]]:
    """Reads the given tasks and dev stages from the disk.

        paths: List of paths to the previous runs.

        dict[task_id, dict[dev_stage, dict[config_id, ConfigResult]].
    path_iter = chain.from_iterable(Path(path).iterdir() for path in paths)

    results: dict[int, dict[int, dict[str, ConfigResult]]] = {}

    for task_dir_path in path_iter:
        if not is_valid_task_path(task_dir_path):

        task_id = get_id_from_path(task_dir_path)
        if task_id is None:

        results[task_id] = {}

        for dev_dir_path in task_dir_path.iterdir():
            if not is_valid_dev_path(dev_dir_path):

            dev_id = get_id_from_path(dev_dir_path)
            if dev_id is None:

            state = SharedState(Path(dev_dir_path))
            with state.lock(poll=1, timeout=None):
                refs = state.trial_refs()

            result = { ref.to_result() for ref in refs[Trial.State.COMPLETE]}
            results[task_id][dev_id] = result

    return results

read_user_prior_results_from_disk #

    path: str | Path,
) -> dict[str, dict[str, ConfigResult]]

Reads the user prior results from the disk.


Path to the user prior results.

TYPE: str | Path

dict[str, dict[str, ConfigResult]]

dict[prior_dir_name, dict[hyperparameter, value].

Source code in neps/utils/
def read_user_prior_results_from_disk(
    path: str | Path,
) -> dict[str, dict[str, ConfigResult]]:
    """Reads the user prior results from the disk.

        path: Path to the user prior results.

        dict[prior_dir_name, dict[hyperparameter, value].
    path = Path(path)
    if not path.is_dir():
        raise ValueError(f"Path '{path}' is not a directory.")

    results = {}
    for prior_dir in path.iterdir():
        if not prior_dir.is_dir():

        state = SharedState(prior_dir)
        with state.lock(poll=0.1, timeout=None):
            refs = state.trial_refs()

        results[] = {
   ref.to_result() for ref in refs[Trial.State.COMPLETE]

    return results

summarize_results #

    working_dir: str | Path,
    final_task_id: int | None = None,
    final_dev_id: int | None = None,
    sub_dir: str = "",
    write_to_file: bool = True
) -> BestLossesDict

Summarizes the results of the given working directory.

This includes runs over multiple seeds. The results are saved in the working directory.


path to the working directory that contains directories for all seeds

TYPE: str | Path


id of the tasks whose results should be summarized. If None, all tasks are summarized.

TYPE: int | None DEFAULT: None


if of the development stage whose results should be summarized. If None, all development stages are summarized.

TYPE: int | None DEFAULT: None


subdirectory to look into for specific seeds. * If subdir is provided: working_dir/something/<subdir> * Otherwise: working_dir/something



if True, the results are written to a file in the working directory, using the latest taks and dev stage ids. summary_task_<task_id>_dev_<dev_id>.yaml

TYPE: bool DEFAULT: True

Source code in neps/utils/
def summarize_results(
    working_dir: str | Path,
    final_task_id: int | None = None,
    final_dev_id: int | None = None,
    sub_dir: str = "",
    write_to_file: bool = True,
) -> BestLossesDict:
    """Summarizes the results of the given working directory.

    This includes runs over multiple seeds.
    The results are saved in the working directory.

        working_dir: path to the working directory that contains directories for all seeds
        final_task_id: id of the tasks whose results should be summarized.
            If None, all tasks are summarized.
        final_dev_id: if of the development stage whose results should be summarized.
            If None, all development stages are summarized.
        sub_dir: subdirectory to look into for specific seeds.
            * If subdir is provided: `working_dir/something/<subdir>`
            * Otherwise: `working_dir/something`
        write_to_file: if True, the results are written to a file in the working
            directory, using the latest taks and dev stage ids.
    working_dir = Path(working_dir)

    best_losses = []
    for seed_dir in working_dir.iterdir():
        if not is_valid_seed_path(seed_dir):

        if sub_dir:
            seed_dir = seed_dir / sub_dir  # noqa: PLW2901

        if final_task_id is not None and final_dev_id is not None:
            results = read_tasks_and_dev_stages_from_disk([seed_dir])

            # TODO(unknown): only use IDs if provided
            final_results = results[final_task_id][final_dev_id]
            state = SharedState(Path(seed_dir))
            with state.lock(poll=1, timeout=None):
                refs = state.trial_refs()

            final_results = {
       ref.to_result() for ref in refs[Trial.State.COMPLETE]

        # This part is copied from neps.status()
        best_loss = float("inf")
        num_error = 0
        for _, evaluation in final_results.items():
            if evaluation.result == "error":
                num_error += 1
            loss = _get_loss(evaluation.result, ignore_errors=True)
            if isinstance(loss, float) and loss < best_loss:
                best_loss = _get_loss(evaluation.result)


    if len(best_losses) == 0:
        raise ValueError(f"No results found in directort {working_dir}.")

    best_losses_dict = BestLossesDict(
        best_loss_std_err=float(np.std(best_losses) / np.sqrt(np.size(best_losses))),
        best_loss_quantile_25=float(np.quantile(best_losses, 0.25)),
        best_loss_quantile_75=float(np.quantile(best_losses, 0.75)),

    if write_to_file:
        task_id_str = str(final_task_id).zfill(5)
        dev_id_str = str(final_dev_id).zfill(5)
        file_path = working_dir / ("summary_task_" + task_id_str + "_dev_" + dev_id_str)

        with file_path.with_suffix(".yaml").open("w") as f:
            yaml.dump(best_losses_dict, f, default_flow_style=False)

        with file_path.with_suffix(".json").open("w") as f:
            json.dump(best_losses_dict, f)

    return best_losses_dict

summarize_results_all_tasks_all_devs #

    path: str | Path,
    sub_dir: str = "",
    file_name: str = "summary",
    user_prior_dir: str | Path | None = None,
) -> Any

Summarizes the results of all tasks and all development stages.

This includes runs overrmultiple seeds. The results are saved in the working directory.

Source code in neps/utils/
def summarize_results_all_tasks_all_devs(
    path: str | Path,
    sub_dir: str = "",
    file_name: str = "summary",
    user_prior_dir: str | Path | None = None,
) -> Any:
    """Summarizes the results of all tasks and all development stages.

    This includes runs overrmultiple seeds. The results are saved in
    the working directory.
    # go into the first seed directory and read the tasks and dev stages
    path = Path(path)

    # TODO(eddiebergman): Please see issue #80
    for seed_dir in path.iterdir():
        if not is_valid_seed_path(seed_dir):

        seed_dir_path = seed_dir / sub_dir if sub_dir else seed_dir
        results = read_tasks_and_dev_stages_from_disk([seed_dir_path])
        raise ValueError(f"No results found in directory {path}.")

    summary = {}
    for task_id, task in results.items():
        for dev_id, _ in task.items():
            summary[(task_id, dev_id)] = summarize_results(

    summary_user_prior = {}
    # TODO(eddiebergman): Please see issue #80, figure out what user_prior_dir is
    if user_prior_dir is not None:
        user_prior_dir = Path(user_prior_dir)

        if sub_dir:
            previously_inferred_path = os.path.join(sub_dir, str(user_prior_dir))  # noqa: PTH118
            raise NotImplementedError(
                "Sorry, don't know what should have been done here but we now explicitly"
                "raise instead of silently summarizing what would be a non-existant path"
                f"before. Previously inferred path was: {previously_inferred_path}"

        user_prior_results = read_user_prior_results_from_disk(user_prior_dir)
        for prior_name, _ in user_prior_results.items():
            summary_user_prior[prior_name] = summarize_results(
                sub_dir=str(user_prior_dir / prior_name),

    with (path / file_name).with_suffix(".jsonl").open("w") as f:
        # write jsonl file with one line per task and dev stage
        for (task_id, dev_id), metrics in summary.items():
                    {"IDs": {"task_id": task_id, "dev_id": dev_id}, "metrics": metrics}
        for prior_name, metrics in summary_user_prior.items():
            f.write(json.dumps({"IDs": {"prior_name": prior_name}, "metrics": metrics}))