Skip to content

Cli

This module provides a command-line interface (CLI) for NePS.

compute_duration #

compute_duration(start_time: float) -> str

Compute duration from start_time to current time.

Source code in neps/utils/cli.py
def compute_duration(start_time: float) -> str:
    """Compute duration from start_time to current time."""
    return format_duration(datetime.now().timestamp() - start_time)

compute_incumbents #

compute_incumbents(
    sorted_trials: List[Trial],
) -> List[Trial]

Compute the list of incumbent trials based on the best objective_to_minimize.

Source code in neps/utils/cli.py
def compute_incumbents(sorted_trials: List[Trial]) -> List[Trial]:
    """Compute the list of incumbent trials based on the best objective_to_minimize."""
    best_objective_to_minimize = float("inf")
    incumbents = []
    for trial in sorted_trials:
        if (
            trial.report is not None
            and trial.report.objective_to_minimize is not None
            and trial.report.objective_to_minimize < best_objective_to_minimize
        ):
            best_objective_to_minimize = trial.report.objective_to_minimize
            incumbents.append(trial)
    return incumbents[::-1]  # Reverse for most recent first

convert_timestamp #

convert_timestamp(timestamp: float | None) -> str

Convert a UNIX timestamp to a human-readable datetime string.

Source code in neps/utils/cli.py
def convert_timestamp(timestamp: float | None) -> str:
    """Convert a UNIX timestamp to a human-readable datetime string."""
    if timestamp is None:
        return "None"
    return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")

display_results #

display_results(
    directory_path: Path, incumbents: List[Trial]
) -> None

Display the results of the NePS run.

Source code in neps/utils/cli.py
def display_results(directory_path: Path, incumbents: List[Trial]) -> None:
    """Display the results of the NePS run."""
    print(f"Results for NePS run: {directory_path}")
    print("--------------------")
    print("All Incumbent Trials:")
    header = f"{'ID':<6} {'Loss':<12} {'Config':<60}"
    print(header)
    print("-" * len(header))
    if incumbents:
        for trial in incumbents:
            if (
                trial.report is not None
                and trial.report.objective_to_minimize is not None
            ):
                config = ", ".join(f"{k}: {v}" for k, v in trial.config.items())
                print(
                    f"{trial.id:<6} {trial.report.objective_to_minimize:<12.6f} {config:<60}"
                )
            else:
                print(f"Trial {trial.id} has no valid objective_to_minimize.")
    else:
        print("No Incumbent Trials found.")

dump_all_configs #

dump_all_configs(
    csv_config_data_path: Path,
    summary_csv_dir: Path,
    dump_format: str,
) -> None

Dump all configurations to the specified format.

Source code in neps/utils/cli.py
def dump_all_configs(
    csv_config_data_path: Path, summary_csv_dir: Path, dump_format: str
) -> None:
    """Dump all configurations to the specified format."""
    dump_format = dump_format.lower()
    supported_formats = ["csv", "json", "parquet"]
    if dump_format not in supported_formats:
        print(
            f"Unsupported dump format: '{dump_format}'. "
            f"Supported formats are: {supported_formats}."
        )
        return

    base_name = csv_config_data_path.stem  # 'config_data'

    if dump_format == "csv":
        # CSV is already available
        print(
            f"All trials successfully dumped to '{summary_csv_dir}/{base_name}.{dump_format}'."
        )
    else:
        # Define output file path with desired extension
        output_file_name = f"{base_name}.{dump_format}"
        output_file_path = summary_csv_dir / output_file_name

        try:
            # Read the existing CSV into DataFrame
            df = pd.read_csv(csv_config_data_path)

            # Save to the desired format
            if dump_format == "json":
                df.to_json(output_file_path, orient="records", indent=4)
            elif dump_format == "parquet":
                df.to_parquet(output_file_path, index=False)

            print(f"All trials successfully dumped to '{output_file_path}'.")
        except Exception as e:
            print(f"Error dumping all trials to '{dump_format}': {e}")

dump_incumbents #

dump_incumbents(
    csv_config_data_path: Path,
    summary_csv_dir: Path,
    dump_format: str,
    incumbents_ids: List[str],
) -> None

Dump incumbent trials to the specified format.

Source code in neps/utils/cli.py
def dump_incumbents(
    csv_config_data_path: Path,
    summary_csv_dir: Path,
    dump_format: str,
    incumbents_ids: List[str],
) -> None:
    """Dump incumbent trials to the specified format."""
    dump_format = dump_format.lower()
    supported_formats = ["csv", "json", "parquet"]
    if dump_format not in supported_formats:
        print(
            f"Unsupported dump format: '{dump_format}'. Supported formats are: {supported_formats}."
        )
        return

    base_name = "incumbents"  # Name for incumbents file

    if not incumbents_ids:
        print("No incumbent trials found to dump.")
        return

    try:
        # Read the existing CSV into DataFrame
        df = pd.read_csv(csv_config_data_path)

        # Filter DataFrame for incumbent IDs
        df_incumbents = df[df["config_id"].isin(incumbents_ids)]

        if df_incumbents.empty:
            print("No incumbent trials found in the summary CSV.")
            return

        # Define output file path with desired extension
        output_file_name = f"{base_name}.{dump_format}"
        output_file_path = summary_csv_dir / output_file_name

        # Save to the desired format
        if dump_format == "csv":
            df_incumbents.to_csv(output_file_path, index=False)
        elif dump_format == "json":
            df_incumbents.to_json(output_file_path, orient="records", indent=4)
        elif dump_format == "parquet":
            df_incumbents.to_parquet(output_file_path, index=False)

        print(f"Incumbent trials successfully dumped to '{output_file_path}'.")
    except Exception as e:
        print(f"Error dumping incumbents to '{dump_format}': {e}")

format_duration #

format_duration(seconds: float | None) -> str

Convert duration in seconds to a h:min:sec format.

Source code in neps/utils/cli.py
def format_duration(seconds: float | None) -> str:
    """Convert duration in seconds to a h:min:sec format."""
    if seconds is None:
        return "None"
    duration = str(timedelta(seconds=seconds))
    # Remove milliseconds for alignment
    if "." in duration:
        duration = duration.split(".")[0]
    return duration

handle_report_config #

handle_report_config(args: Namespace) -> None

Handles the report-config command which updates reports for trials in the NePS state.

Source code in neps/utils/cli.py
def handle_report_config(args: argparse.Namespace) -> None:
    """Handles the report-config command which updates reports for
    trials in the NePS state."""
    # Load run_args from the provided path or default to run_config.yaml
    if args.run_args:
        run_args_path = Path(args.run_args)
    else:
        run_args_path = Path("run_config.yaml")
    if not run_args_path.exists():
        print(f"Error: run_args file {run_args_path} does not exist.")
        return

    with open(run_args_path, "r") as f:
        run_args = yaml.safe_load(f)

    # Get root_directory from run_args
    root_directory = run_args.get("root_directory")
    if not root_directory:
        print("Error: 'root_directory' is not specified in the run_args file.")
        return

    root_directory = Path(root_directory)
    if not root_directory.exists():
        print(f"Error: The directory {root_directory} does not exist.")
        return

    neps_state = load_neps_state(root_directory)
    if neps_state is None:
        return

    # Load the existing trial by ID
    try:
        trial = neps_state.unsafe_retry_get_trial_by_id(args.trial_id)
        if not trial:
            print(f"No trial found with ID {args.trial_id}")
            return
    except Exception as e:
        print(f"Error fetching trial with ID {args.trial_id}: {e}")
        return None

    # Update state of the trial and create report
    report = trial.set_complete(
        report_as=args.reported_as,
        time_end=args.time_end,
        objective_to_minimize=args.objective_to_minimize,
        cost=args.cost,
        learning_curve=args.learning_curve,
        err=Exception(args.err) if args.err else None,
        tb=args.tb,
        evaluation_duration=args.duration,
        extra={},
    )

    # Update NePS state
    try:
        neps_state._report_trial_evaluation(
            trial=trial, report=report, worker_id=args.worker_id
        )
    except Exception as e:
        print(f"Error updating the report for trial {args.trial_id}: {e}")
        return None

    print(f"Report for trial ID {trial.metadata.id} has been successfully updated.")

    print("\n--- Report Summary ---")
    print(f"Trial ID: {trial.metadata.id}")
    print(f"Reported As: {report.reported_as}")
    print(f"Time Ended: {convert_timestamp(trial.metadata.time_end)}")
    print(
        f"Loss: {report.objective_to_minimize if report.objective_to_minimize is not None else 'N/A'}"
    )
    print(f"Cost: {report.cost if report.cost is not None else 'N/A'}")
    print(f"Evaluation Duration: {format_duration(report.evaluation_duration)}")

    if report.learning_curve:
        print(f"Learning Curve: {' '.join(map(str, report.learning_curve))}")
    else:
        print("Learning Curve: N/A")

    if report.err:
        print(f"Error Type: {type(report.err).__name__}")
        print(f"Error Message: {str(report.err)}")
        print("Traceback:")
        print(report.tb if report.tb else "N/A")
    else:
        print("Error: None")

    print("----------------------\n")

info_config #

info_config(args: Namespace) -> None

Handles the info-config command by providing information based on directory and id.

Source code in neps/utils/cli.py
def info_config(args: argparse.Namespace) -> None:
    """Handles the info-config command by providing information based on directory
    and id."""
    directory_path = get_root_directory(args)
    if directory_path is None:
        return
    config_id = args.id

    neps_state = load_neps_state(directory_path)
    if neps_state is None:
        return
    try:
        trial = neps_state.unsafe_retry_get_trial_by_id(config_id)
    except TrialNotFoundError:
        print(f"No trial found with ID {config_id}.")
        return

    print("Trial Information:")
    print(f"  Trial ID: {trial.metadata.id}")
    print(f"  State: {trial.metadata.state}")
    print(f"  Configurations:")
    for key, value in trial.config.items():
        print(f"    {key}: {value}")

    print("\nMetadata:")
    print(f"  Location: {trial.metadata.location}")
    print(f"  Previous Trial ID: {trial.metadata.previous_trial_id}")
    print(f"  Sampling Worker ID: {trial.metadata.sampling_worker_id}")
    print(f"  Time Sampled: {convert_timestamp(trial.metadata.time_sampled)}")
    print(f"  Evaluating Worker ID: {trial.metadata.evaluating_worker_id}")
    print(f"  Evaluation Duration: {format_duration(trial.metadata.evaluation_duration)}")
    print(f"  Time Started: {convert_timestamp(trial.metadata.time_started)}")
    print(f"  Time End: {convert_timestamp(trial.metadata.time_end)}")

    if trial.report is not None:
        print("\nReport:")
        print(f"  Objective_to_minimize: {trial.report.objective_to_minimize}")
        print(f"  Cost: {trial.report.cost}")
        print(f"  Reported As: {trial.report.reported_as}")
        error = trial.report.err
        if error is not None:
            print(f"  Error Type: {type(error).__name__}")
            print(f"  Error Message: {str(error)}")
            print(f"  Traceback:")
            print(f"    {trial.report.tb}")
    else:
        print("No report available.")

init_config #

init_config(args: Namespace) -> None

Creates a 'run_args' configuration YAML file template if it does not already exist.

Source code in neps/utils/cli.py
def init_config(args: argparse.Namespace) -> None:
    """Creates a 'run_args' configuration YAML file template if it does not already
    exist.
    """
    config_path = (
        Path(args.config_path).resolve()
        if args.config_path
        else Path("run_config.yaml").resolve()
    )

    if args.database:
        if config_path.exists():
            with config_path.open("r") as file:
                run_args = yaml.safe_load(file)

            max_cost_total = run_args.get("max_cost_total")
            # Create the optimizer
            _, optimizer_info = load_optimizer(
                optimizer=run_args.get("optimizer"),  # type: ignore
                space=run_args.get("pipeline_space"),  # type: ignore
            )
            try:
                directory = run_args.get("root_directory")
                if directory is None:
                    return
                else:
                    directory = Path(directory)
                is_new = not directory.exists()
                _ = NePSState.create_or_load(
                    path=directory,
                    optimizer_info=optimizer_info,
                    optimizer_state=OptimizationState(
                        seed_snapshot=SeedSnapshot.new_capture(),
                        budget=(
                            BudgetInfo(max_cost_total=max_cost_total, used_cost_budget=0)
                            if max_cost_total is not None
                            else None
                        ),
                        shared_state=None,  # TODO: Unused for the time being...
                    ),
                )
                if is_new:
                    print("NePS state was successfully created.")
                else:
                    print("NePS state was already created.")
            except Exception as e:
                print(f"Error creating neps state: {e}")
        else:
            print(
                f"{config_path} does not exist. Make sure that your configuration "
                f"file already exists if you don't have specified your own path. "
                f"Run 'neps init' to create run_config.yaml"
            )

    elif not config_path.exists():
        with config_path.open("w") as file:
            template = args.template if args.template else "basic"
            if template == "basic":
                file.write(
                    """# Add your NEPS configuration settings here

evaluate_pipeline:
  path: "path/to/your/evaluate_pipeline.py"
  name: name_of_your_pipeline_function

pipeline_space:
  float_parameter_name:
    type: "float"
    lower:
    upper:
    log: false
  int_parameter_name:
    type: "int"
    lower:
    upper:
  categorical_parameter_name:
    choices: ["choice1", "choice2", "choice3"]
  constant_parameter_name: 17

root_directory: "set/path/for/root_directory"
max_evaluations_total:
overwrite_working_directory:
"""
                )
            elif template == "complete":
                file.write(
                    """# Full Configuration Template for NePS

evaluate_pipeline:
  path: path/to/your/evaluate_pipeline.py  # Path to the function file
  name: example_pipeline              # Function name within the file

pipeline_space:
  learning_rate:
    lower: 1e-5
    upper: 1e-1
    log: true
  epochs:
    lower: 5
    upper: 20
    is_fidelity: true
  optimizer:
    choices: [adam, sgd, adamw]
  batch_size: 64

root_directory: path/to/results       # Directory for result storage
max_evaluations_total: 20             # Budget
max_cost_total:

# Debug and Monitoring
overwrite_working_directory: false
post_run_summary: true

# Parallelization Setup
max_evaluations_per_run:
continue_until_max_evaluation_completed: true

# Error Handling
objective_value_on_error:
cost_value_on_error:
ignore_errors:

# Customization Options
optimizer: hyperband       # Internal key to select a NePS optimizer.
"""
                )
    else:
        print(f"Path {config_path} does already exist.")

load_neps_errors #

load_neps_errors(args: Namespace) -> None

Handles the 'errors' command by loading errors from the neps_state.

Source code in neps/utils/cli.py
def load_neps_errors(args: argparse.Namespace) -> None:
    """Handles the 'errors' command by loading errors from the neps_state."""
    directory_path = get_root_directory(args)
    if directory_path is None:
        return

    neps_state = load_neps_state(directory_path)
    if neps_state is None:
        return
    errors = neps_state.lock_and_get_errors()

    if not errors.errs:
        print("No errors found.")
        return

    # Print out the errors in a human-readable format
    print(f"Loaded Errors from directory: {directory_path}\n")

    for error in errors.errs:
        print(f"Error in Trial ID: {error.trial_id}")
        print(f"  Worker ID: {error.worker_id}")
        print(f"  Error Type: {error.err_type}")
        print(f"  Error Message: {error.err}")
        print(f"  Traceback:")
        print(f"{error.tb}")
        print("\n" + "-" * 50 + "\n")

load_neps_state #

load_neps_state(
    directory_path: Path,
) -> Optional[NePSState]

Load the NePS state with error handling.

Source code in neps/utils/cli.py
def load_neps_state(directory_path: Path) -> Optional[NePSState]:
    """Load the NePS state with error handling."""
    try:
        return NePSState.create_or_load(directory_path, load_only=True)
    except Exception as e:
        print(f"Unexpected error loading NePS state: {e}")
    return None

main #

main() -> None

CLI entry point.

This function sets up the command-line interface (CLI) for NePS using argparse. It defines the available subcommands and their respective arguments.

Source code in neps/utils/cli.py
def main() -> None:
    """CLI entry point.

    This function sets up the command-line interface (CLI) for NePS using argparse.
    It defines the available subcommands and their respective arguments.
    """
    raise NotImplementedError("Sorry, this is not currently implemented.")

parse_kv_pairs #

parse_kv_pairs(kv_list: list[str]) -> dict

Parse a list of key=value strings into a dictionary with appropriate types.

Source code in neps/utils/cli.py
def parse_kv_pairs(kv_list: list[str]) -> dict:
    """Parse a list of key=value strings into a dictionary with appropriate types."""

    def convert_value(value: str) -> int | float | str:
        """Convert the value to the appropriate type."""
        # Check for boolean
        if value.lower() in ("true", "false"):
            return value.lower() == "true"

        # Check for float if value contains '.' or 'e'
        if "." in value or "e" in value.lower():
            try:
                return float(value)
            except ValueError:
                return value  # Return as string if conversion fails

        # Check for integer
        try:
            return int(value)
        except ValueError:
            return value  # Return as string if conversion fails

    result = {}
    for item in kv_list:
        if "=" in item:
            key, value = item.split("=", 1)
            result[key] = convert_value(value)
        else:
            raise ValueError("Each kwarg must be in key=value format.")
    return result

parse_time_end #

parse_time_end(time_str: str) -> float

Parses a UNIX timestamp or a human-readable time string and returns a UNIX timestamp.

Source code in neps/utils/cli.py
def parse_time_end(time_str: str) -> float:
    """Parses a UNIX timestamp or a human-readable time string
    and returns a UNIX timestamp."""
    try:
        # First, try to interpret the input as a UNIX timestamp
        return float(time_str)
    except ValueError:
        pass

    try:
        # If that fails, try to interpret it as a human-readable datetime
        # string (YYYY-MM-DD HH:MM:SS)
        dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
        return dt.timestamp()  # Convert to UNIX timestamp (float)
    except ValueError:
        raise argparse.ArgumentTypeError(
            f"Invalid time format: '{time_str}'. "
            f"Use UNIX timestamp or 'YYYY-MM-DD HH:MM:SS'."
        )

plot_incumbents #

plot_incumbents(
    all_trials: List[Trial],
    incumbents: List[Trial],
    directory_path: Path,
) -> str

Plot the evolution of incumbent trials over the total number of trials.

Source code in neps/utils/cli.py
def plot_incumbents(
    all_trials: List[Trial], incumbents: List[Trial], directory_path: Path
) -> str:
    """Plot the evolution of incumbent trials over the total number of trials."""
    id_to_index = {trial.id: idx + 1 for idx, trial in enumerate(all_trials)}

    # Collect data for plotting
    x_values = [id_to_index[incumbent.id] for incumbent in incumbents]
    y_values = [
        incumbent.report.objective_to_minimize
        for incumbent in incumbents
        if incumbent.report is not None
        and incumbent.report.objective_to_minimize is not None
    ]

    plt.figure(figsize=(12, 6))
    sns.lineplot(
        x=x_values,
        y=y_values,
        marker="o",
        linestyle="-",
        markersize=8,
        color="dodgerblue",
    )
    plt.xlabel("Number of Trials")
    plt.ylabel("Loss")
    plt.title("Evolution of Incumbents Over Trials")

    # Dynamically set x-axis ticks based on the number of trials
    num_trials = len(all_trials)
    if num_trials < 20:
        tick_spacing = 1  # Every trial is labeled if fewer than 20 trials
    else:
        tick_spacing = max(
            5, round(num_trials / 10 / 5) * 5
        )  # Round to nearest multiple of 5

    ticks = np.arange(0, num_trials + 1, tick_spacing)
    ticks[0] = 1
    plt.xticks(ticks)

    sns.set_style("whitegrid")
    plt.grid(True, linestyle="--", linewidth=0.5)
    plt.tight_layout()

    # Save the figure
    plot_file_name = "incumbents_evolution.png"
    plot_path = os.path.join(directory_path, plot_file_name)
    plt.savefig(plot_path)
    plt.close()

    return plot_path

print_help #

print_help(args: Optional[Namespace] = None) -> None

Prints help information for the NEPS CLI.

Source code in neps/utils/cli.py
def print_help(args: Optional[argparse.Namespace] = None) -> None:
    """Prints help information for the NEPS CLI."""
    help_text = """
Usage: neps [COMMAND] [OPTIONS]

Available Commands:
-------------------

neps init [OPTIONS]
    Generates a 'run_args' YAML template file.
    Options:
    --config-path <path/to/config.yaml> (Optional: Specify the path for the config
    file. Default is run_config.yaml)
    --template [basic|complete] (Optional: Choose between a basic or complete template.)
    --database (Optional: Creates a NEPS state. Requires an existing config.yaml.)

neps run [OPTIONS]
    Runs a neural pipeline search.
    Options:
    --run-args <path_to_run_args> (Path to the YAML configuration file.)
    --run-pipeline <path_to_module:function_name> (Path and function for the pipeline.)
    --pipeline-space <path_to_yaml> (Path to the YAML defining the search space.)
    --root-directory <path> (Optional: Directory for saving progress and
    synchronization. Default is 'root_directory' from run_config.yaml if not provided.)
    --overwrite-working-directory (Deletes the working directory at the start of the run.)
    --post-run-summary/--no-post-run-summary (Toggle summary after running.)
    --max-evaluations-total <int> (Total number of evaluations to run.)
    --max-evaluations-per-run <int> (Max evaluations per run call.)
    --continue-until-max-evaluation-completed (Continue until max evaluations are completed.)
    --max-cost-total <float> (Max cost before halting new evaluations.)
    --ignore-errors (Ignore errors during optimization.)
    --objective_to_minimize-value-on-error <float> (Assumed objective_to_minimize value on error.)
    --cost-value-on-error <float> (Assumed cost value on error.)
    --optimizer <key> (optimizer algorithm key for optimization.)
    --optimizer-kwargs <key=value>... (Additional kwargs for the optimizer.)

neps info-config <id> [OPTIONS]
    Provides detailed information about a specific configuration by its ID.
    Options:
    --root-directory <path> (Optional: Path to your root_directory. Default is
    'root_directory' from run_config.yaml if not provided.)

neps errors [OPTIONS]
    Lists all errors from the specified NePS run.
    Options:
    --root-directory <path> (Optional: Path to your root_directory. Default is
    'root_directory' from run_config.yaml if not provided.)

neps sample-config [OPTIONS]
    Sample a configuration from the existing NePS state.
    Options:
    --root-directory <path> (Optional: Path to your root_directory. Default is
    'root_directory' from run_config.yaml if not provided.)

neps status [OPTIONS]
    Check the status of the NePS run.
    Options:
    --root-directory <path> (Optional: Path to your root_directory. Default is
    'root_directory' from run_config.yaml if not provided.)
    --pending (Show only pending trials.)
    --evaluating (Show only evaluating trials.)
    --succeeded (Show only succeeded trials.)

neps results [OPTIONS]
    Display results of the NePS run.
    Options:
    --root-directory <path> (Optional: Path to your root_directory. Defaults is
    'root_directory' from run_config.yaml if not provided.)
    --plot (Plot the results if set.)

neps help
    Displays this help message.
    """
    print(help_text)

results #

results(args: Namespace) -> None

Handles the 'results' command by displaying incumbents, optionally plotting, and dumping results to files based on the specified options.

Source code in neps/utils/cli.py
def results(args: argparse.Namespace) -> None:
    """Handles the 'results' command by displaying incumbents, optionally plotting,
    and dumping results to files based on the specified options."""
    directory_path = get_root_directory(args)
    if directory_path is None:
        return

    # Attempt to generate the summary CSV
    try:
        csv_config_data_path, _ = post_run_csv(directory_path)
    except Exception as e:
        print(f"Error generating summary CSV: {e}")
        return

    summary_csv_dir = csv_config_data_path.parent  # 'summary_csv' directory

    # Load NePS state
    neps_state = load_neps_state(directory_path)
    if neps_state is None:
        return

    def sort_trial_id(trial_id: str) -> List[int]:
        parts = trial_id.split("_")  # Split the ID by '_'
        # Convert each part to an integer for proper numeric sorting
        return [int(part) for part in parts]

    trials = neps_state.lock_and_read_trials()
    sorted_trials = sorted(trials.values(), key=lambda x: sort_trial_id(x.id))

    # Compute incumbents
    incumbents = compute_incumbents(sorted_trials)
    incumbents_ids = [trial.id for trial in incumbents]

    # Handle Dump Options
    if args.dump_all_configs or args.dump_incumbents:
        if args.dump_all_configs:
            dump_all_configs(csv_config_data_path, summary_csv_dir, args.dump_all_configs)
            return

        if args.dump_incumbents:
            dump_incumbents(
                csv_config_data_path,
                summary_csv_dir,
                args.dump_incumbents,
                incumbents_ids,
            )
            return

    # Display Results
    display_results(directory_path, incumbents)

    # Handle Plotting
    if args.plot:
        plot_path = plot_incumbents(sorted_trials, incumbents, summary_csv_dir)
        print(f"Plot saved to '{plot_path}'.")

run_optimization #

run_optimization(args: Namespace) -> None

Collects arguments from the parser and runs the NePS optimization. Args: args (argparse.Namespace): Parsed command-line arguments.

Source code in neps/utils/cli.py
def run_optimization(args: argparse.Namespace) -> None:
    """Collects arguments from the parser and runs the NePS optimization.
    Args: args (argparse.Namespace): Parsed command-line arguments.
    """
    if args.run_args is None:
        run_args = Path("run_config.yaml")
    else:
        run_args = args.run_args

    if isinstance(args.evaluate_pipeline, str):
        module_path, function_name = args.evaluate_pipeline.split(":")
        evaluate_pipeline = dynamic_load_object(module_path, function_name)
    else:
        evaluate_pipeline = args.evaluate_pipeline

    logging.basicConfig(level=logging.INFO)
    with open(run_args, "r") as file:
        run_config = yaml.safe_load(file)

    neps.run(**run_config)

sample_config #

sample_config(args: Namespace) -> None

Handles the sample-config command which samples configurations from the NePS state.

Source code in neps/utils/cli.py
def sample_config(args: argparse.Namespace) -> None:
    """Handles the sample-config command which samples configurations from the NePS
    state."""
    # Load run_args from the provided path or default to run_config.yaml
    if args.run_args:
        run_args_path = Path(args.run_args)
    else:
        run_args_path = Path("run_config.yaml")

    if not run_args_path.exists():
        print(f"Error: run_args file {run_args_path} does not exist.")
        return

    with run_args_path.open("r") as file:
        run_args = yaml.safe_load(file)

    # Get root_directory from the run_args
    root_directory = run_args.get("root_directory")
    if not root_directory:
        print("Error: 'root_directory' is not specified in the run_args file.")
        return

    root_directory = Path(root_directory)
    if not root_directory.exists():
        print(f"Error: The directory {root_directory} does not exist.")
        return

    neps_state = load_neps_state(root_directory)
    if neps_state is None:
        return

    # Get the worker_id and number_of_configs from arguments
    worker_id = args.worker_id
    num_configs = args.number_of_configs if args.number_of_configs else 1

    optimizer, _ = load_optimizer(
        optimizer=run_args.get("optimizer"),  # type: ignore
        space=run_args.get("pipeline_space"),  # type: ignore
    )

    # Sample trials
    for _ in range(num_configs):
        try:
            trial = neps_state.lock_and_sample_trial(optimizer, worker_id=worker_id)
        except Exception as e:
            print(f"Error during configuration sampling: {e}")
            continue  # Skip to the next iteration

        print(f"Sampled configuration with Trial ID: {trial.id}")
        print(f"Location: {trial.metadata.location}")
        print("Configuration:")
        for key, value in trial.config.items():
            print(f"  {key}: {value}")
        print("\n")

status #

status(args: Namespace) -> None

Handles the status command, providing a summary of the NEPS run.

Source code in neps/utils/cli.py
def status(args: argparse.Namespace) -> None:
    """Handles the status command, providing a summary of the NEPS run."""
    # Get the root_directory from args or load it from run_config.yaml
    raise NotImplementedError("Sorry, broken for the moment")

validate_directory #

validate_directory(path: Path) -> bool

Validates whether the given path exists and is a directory.

PARAMETER DESCRIPTION
path

The path to validate.

TYPE: Path

RETURNS DESCRIPTION
bool

True if valid, False otherwise.

TYPE: bool

Source code in neps/utils/cli.py
def validate_directory(path: Path) -> bool:
    """
    Validates whether the given path exists and is a directory.

    Args:
        path (Path): The path to validate.

    Returns:
        bool: True if valid, False otherwise.
    """
    if not path.exists():
        print(f"Error: The directory '{path}' does not exist.")
        return False
    if not path.is_dir():
        print(f"Error: The path '{path}' exists but is not a directory.")
        return False
    return True