Skip to content

Data loading

neps.utils.data_loading #

Utility functions for loading data from disk.

BestLossesDict #

Bases: TypedDict

Summary of the best losses over multiple seeds.

get_id_from_path #

get_id_from_path(path: str | Path | None) -> int | None

Extracts the id from the given path.

The id is the last part of the path, which is a multiple digit number.

Note

I think this refers to task ids and not config ids!!!

Source code in neps/utils/data_loading.py
def get_id_from_path(path: str | Path | None) -> int | None:
    """Extracts the id from the given path.

    The id is the last part of the path, which is a multiple digit number.

    Note:
        I think this refers to task ids and not config ids!!!
    """
    if path is None:
        return None
    numbers = re.findall(r"\d+", str(path))
    if len(numbers) == 0:
        return None

    return int(numbers[-1])

is_valid_dev_path #

is_valid_dev_path(path: str | Path | None) -> bool

Checks if the given path is a valid path to development stages.

It follows the pattern task_00000/dev_00000, where 00000 is replaced by the task and development stage ids.

Source code in neps/utils/data_loading.py
def is_valid_dev_path(path: str | Path | None) -> bool:
    """Checks if the given path is a valid path to development stages.

    It follows the pattern task_00000/dev_00000, where 00000 is replaced by the
    task and development stage ids.
    """
    if path is None:
        return False

    # TODO: Test for \ and | in the path, not only any non-alphanumerical character.
    #  Currently, false positives are possible.
    #  This regex expression does not work: ".*task_\d+[\/\\]dev_\d+"
    pattern = re.compile(r".*task_\d+\Wdev_\d+")
    return pattern.fullmatch(str(path)) is not None and Path(path).is_dir()

is_valid_seed_path #

is_valid_seed_path(path: str | Path | None) -> bool

Checks if the given path is a valid path to a seed.

It follows the pattern seed_00000, where 00000 is replaced by the seed.

Source code in neps/utils/data_loading.py
def is_valid_seed_path(path: str | Path | None) -> bool:
    """Checks if the given path is a valid path to a seed.

    It follows the pattern seed_00000, where 00000 is replaced by the seed.
    """
    if path is None:
        return False
    path = Path(path)

    if not path.is_dir():
        return False

    return path.name.startswith("seed")

is_valid_task_path #

is_valid_task_path(path: str | Path | None) -> bool

Checks if the given path is a valid task path.

It follows the pattern task_00000, where 00000 is replaced by the task id.

Source code in neps/utils/data_loading.py
def is_valid_task_path(path: str | Path | None) -> bool:
    """Checks if the given path is a valid task path.

    It follows the pattern task_00000, where 00000 is replaced by the task id.
    """
    if path is None:
        return False

    return (
        _VALID_TASK_PATH_PATTERN.fullmatch(str(path)) is not None and Path(path).is_dir()
    )

read_tasks_and_dev_stages_from_disk #

read_tasks_and_dev_stages_from_disk(
    paths: list[str | Path],
) -> dict[int, dict[int, dict[str, ConfigResult]]]

Reads the given tasks and dev stages from the disk.

PARAMETER DESCRIPTION
paths

List of paths to the previous runs.

TYPE: list[str | Path]

RETURNS DESCRIPTION
dict[int, dict[int, dict[str, ConfigResult]]]

dict[task_id, dict[dev_stage, dict[config_id, ConfigResult]].

Source code in neps/utils/data_loading.py
def read_tasks_and_dev_stages_from_disk(
    paths: list[str | Path],
) -> dict[int, dict[int, dict[str, ConfigResult]]]:
    """Reads the given tasks and dev stages from the disk.

    Args:
        paths: List of paths to the previous runs.

    Returns:
        dict[task_id, dict[dev_stage, dict[config_id, ConfigResult]].
    """
    path_iter = chain.from_iterable(Path(path).iterdir() for path in paths)

    results: dict[int, dict[int, dict[str, ConfigResult]]] = {}

    for task_dir_path in path_iter:
        if not is_valid_task_path(task_dir_path):
            continue

        task_id = get_id_from_path(task_dir_path)
        if task_id is None:
            continue

        results[task_id] = {}

        for dev_dir_path in task_dir_path.iterdir():
            if not is_valid_dev_path(dev_dir_path):
                continue

            dev_id = get_id_from_path(dev_dir_path)
            if dev_id is None:
                continue

            state = SharedState(Path(dev_dir_path))
            with state.lock(poll=1, timeout=None):
                refs = state.trial_refs()

            result = {ref.id: ref.to_result() for ref in refs[Trial.State.COMPLETE]}
            results[task_id][dev_id] = result

    return results

read_user_prior_results_from_disk #

read_user_prior_results_from_disk(
    path: str | Path,
) -> dict[str, dict[str, ConfigResult]]

Reads the user prior results from the disk.

PARAMETER DESCRIPTION
path

Path to the user prior results.

TYPE: str | Path

RETURNS DESCRIPTION
dict[str, dict[str, ConfigResult]]

dict[prior_dir_name, dict[hyperparameter, value].

Source code in neps/utils/data_loading.py
def read_user_prior_results_from_disk(
    path: str | Path,
) -> dict[str, dict[str, ConfigResult]]:
    """Reads the user prior results from the disk.

    Args:
        path: Path to the user prior results.

    Returns:
        dict[prior_dir_name, dict[hyperparameter, value].
    """
    path = Path(path)
    if not path.is_dir():
        raise ValueError(f"Path '{path}' is not a directory.")

    results = {}
    for prior_dir in path.iterdir():
        if not prior_dir.is_dir():
            continue

        state = SharedState(prior_dir)
        with state.lock(poll=0.1, timeout=None):
            refs = state.trial_refs()

        results[prior_dir.name] = {
            ref.id: ref.to_result() for ref in refs[Trial.State.COMPLETE]
        }

    return results

summarize_results #

summarize_results(
    working_dir: str | Path,
    final_task_id: int | None = None,
    final_dev_id: int | None = None,
    sub_dir: str = "",
    *,
    write_to_file: bool = True
) -> BestLossesDict

Summarizes the results of the given working directory.

This includes runs over multiple seeds. The results are saved in the working directory.

PARAMETER DESCRIPTION
working_dir

path to the working directory that contains directories for all seeds

TYPE: str | Path

final_task_id

id of the tasks whose results should be summarized. If None, all tasks are summarized.

TYPE: int | None DEFAULT: None

final_dev_id

if of the development stage whose results should be summarized. If None, all development stages are summarized.

TYPE: int | None DEFAULT: None

sub_dir

subdirectory to look into for specific seeds. * If subdir is provided: working_dir/something/<subdir> * Otherwise: working_dir/something

TYPE: str DEFAULT: ''

write_to_file

if True, the results are written to a file in the working directory, using the latest taks and dev stage ids. summary_task_<task_id>_dev_<dev_id>.yaml

TYPE: bool DEFAULT: True

Source code in neps/utils/data_loading.py
def summarize_results(
    working_dir: str | Path,
    final_task_id: int | None = None,
    final_dev_id: int | None = None,
    sub_dir: str = "",
    *,
    write_to_file: bool = True,
) -> BestLossesDict:
    """Summarizes the results of the given working directory.

    This includes runs over multiple seeds.
    The results are saved in the working directory.

    Args:
        working_dir: path to the working directory that contains directories for all seeds
        final_task_id: id of the tasks whose results should be summarized.
            If None, all tasks are summarized.
        final_dev_id: if of the development stage whose results should be summarized.
            If None, all development stages are summarized.
        sub_dir: subdirectory to look into for specific seeds.
            * If subdir is provided: `working_dir/something/<subdir>`
            * Otherwise: `working_dir/something`
        write_to_file: if True, the results are written to a file in the working
            directory, using the latest taks and dev stage ids.
            `summary_task_<task_id>_dev_<dev_id>.yaml`
    """
    working_dir = Path(working_dir)

    best_losses = []
    for seed_dir in working_dir.iterdir():
        if not is_valid_seed_path(seed_dir):
            continue

        if sub_dir:
            seed_dir = seed_dir / sub_dir  # noqa: PLW2901

        if final_task_id is not None and final_dev_id is not None:
            results = read_tasks_and_dev_stages_from_disk([seed_dir])

            # TODO(unknown): only use IDs if provided
            final_results = results[final_task_id][final_dev_id]
        else:
            state = SharedState(Path(seed_dir))
            with state.lock(poll=1, timeout=None):
                refs = state.trial_refs()

            final_results = {
                ref.id: ref.to_result() for ref in refs[Trial.State.COMPLETE]
            }

        # This part is copied from neps.status()
        best_loss = float("inf")
        num_error = 0
        for _, evaluation in final_results.items():
            if evaluation.result == "error":
                num_error += 1
            loss = _get_loss(evaluation.result, ignore_errors=True)
            if isinstance(loss, float) and loss < best_loss:
                best_loss = _get_loss(evaluation.result)

        best_losses.append(best_loss)

    if len(best_losses) == 0:
        raise ValueError(f"No results found in directort {working_dir}.")

    best_losses_dict = BestLossesDict(
        best_loss_mean=float(np.mean(best_losses)),
        best_loss_std=float(np.std(best_losses)),
        best_loss_std_err=float(np.std(best_losses) / np.sqrt(np.size(best_losses))),
        best_loss_min=float(np.min(best_losses)),
        best_loss_max=float(np.max(best_losses)),
        best_loss_median=float(np.median(best_losses)),
        best_loss_quantile_25=float(np.quantile(best_losses, 0.25)),
        best_loss_quantile_75=float(np.quantile(best_losses, 0.75)),
    )

    if write_to_file:
        task_id_str = str(final_task_id).zfill(5)
        dev_id_str = str(final_dev_id).zfill(5)
        file_path = working_dir / ("summary_task_" + task_id_str + "_dev_" + dev_id_str)

        with file_path.with_suffix(".yaml").open("w") as f:
            yaml.dump(best_losses_dict, f, default_flow_style=False)

        with file_path.with_suffix(".json").open("w") as f:
            json.dump(best_losses_dict, f)

    return best_losses_dict

summarize_results_all_tasks_all_devs #

summarize_results_all_tasks_all_devs(
    path: str | Path,
    sub_dir: str = "",
    file_name: str = "summary",
    user_prior_dir: str | Path | None = None,
) -> Any

Summarizes the results of all tasks and all development stages.

This includes runs overrmultiple seeds. The results are saved in the working directory.

Source code in neps/utils/data_loading.py
def summarize_results_all_tasks_all_devs(
    path: str | Path,
    sub_dir: str = "",
    file_name: str = "summary",
    user_prior_dir: str | Path | None = None,
) -> Any:
    """Summarizes the results of all tasks and all development stages.

    This includes runs overrmultiple seeds. The results are saved in
    the working directory.
    """
    # go into the first seed directory and read the tasks and dev stages
    path = Path(path)
    os.scandir(path)

    # TODO(eddiebergman): Please see issue #80
    for seed_dir in path.iterdir():
        if not is_valid_seed_path(seed_dir):
            continue

        seed_dir_path = seed_dir / sub_dir if sub_dir else seed_dir
        results = read_tasks_and_dev_stages_from_disk([seed_dir_path])
        break
    else:
        raise ValueError(f"No results found in directory {path}.")

    summary = {}
    for task_id, task in results.items():
        for dev_id, _ in task.items():
            summary[(task_id, dev_id)] = summarize_results(
                path,
                final_task_id=task_id,
                final_dev_id=dev_id,
                sub_dir=sub_dir,
                write_to_file=False,
            )

    summary_user_prior = {}
    # TODO(eddiebergman): Please see issue #80, figure out what user_prior_dir is
    if user_prior_dir is not None:
        user_prior_dir = Path(user_prior_dir)

        if sub_dir:
            previously_inferred_path = os.path.join(sub_dir, str(user_prior_dir))  # noqa: PTH118
            raise NotImplementedError(
                "Sorry, don't know what should have been done here but we now explicitly"
                "raise instead of silently summarizing what would be a non-existant path"
                f"before. Previously inferred path was: {previously_inferred_path}"
            )

        user_prior_results = read_user_prior_results_from_disk(user_prior_dir)
        for prior_name, _ in user_prior_results.items():
            summary_user_prior[prior_name] = summarize_results(
                working_dir=path,
                sub_dir=str(user_prior_dir / prior_name),
                write_to_file=False,
            )

    with (path / file_name).with_suffix(".jsonl").open("w") as f:
        # write jsonl file with one line per task and dev stage
        for (task_id, dev_id), metrics in summary.items():
            f.write(
                json.dumps(
                    {"IDs": {"task_id": task_id, "dev_id": dev_id}, "metrics": metrics}
                )
            )
            f.write("\n")
        for prior_name, metrics in summary_user_prior.items():
            f.write(json.dumps({"IDs": {"prior_name": prior_name}, "metrics": metrics}))
            f.write("\n")