Skip to content

HPO#

Expand to copy examples/hpo.py (top right)
from typing import Any

import openml
from sklearn.preprocessing import LabelEncoder

from amltk.sklearn import split_data


def get_dataset(
    dataset_id: str | int,
    *,
    seed: int,
    splits: dict[str, float],
) -> dict[str, Any]:
    dataset = openml.datasets.get_dataset(
        dataset_id,
        download_data=True,
        download_features_meta_data=False,
        download_qualities=False,
    )

    target_name = dataset.default_target_attribute
    X, y, _, _ = dataset.get_data(dataset_format="dataframe", target=target_name)
    _y = LabelEncoder().fit_transform(y)

    return split_data(X, _y, splits=splits, seed=seed)  # type: ignore


from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder

from amltk.pipeline import Component, Node, Sequential, Split

pipeline = (
    Sequential(name="Pipeline")
    >> Split(
        {
            "categorical": [
                SimpleImputer(strategy="constant", fill_value="missing"),
                OneHotEncoder(drop="first"),
            ],
            "numerical": Component(
                SimpleImputer,
                space={"strategy": ["mean", "median"]},
            ),
        },
        name="feature_preprocessing",
    )
    >> Component(
        RandomForestClassifier,
        space={
            "n_estimators": (10, 100),
            "max_features": (0.0, 1.0),
            "criterion": ["gini", "entropy", "log_loss"],
        },
    )
)

print(pipeline)
print(pipeline.search_space("configspace"))

from sklearn.metrics import accuracy_score

from amltk.optimization import Trial
from amltk.store import PathBucket


def target_function(
    trial: Trial,
    _pipeline: Node,
    data_bucket: PathBucket,
) -> Trial.Report:
    trial.store({"config.json": trial.config})
    # Load in data
    with trial.profile("data-loading"):
        X_train, X_val, X_test, y_train, y_val, y_test = (
            data_bucket["X_train.csv"].load(),
            data_bucket["X_val.csv"].load(),
            data_bucket["X_test.csv"].load(),
            data_bucket["y_train.npy"].load(),
            data_bucket["y_val.npy"].load(),
            data_bucket["y_test.npy"].load(),
        )

    # Configure the pipeline with the trial config before building it.
    sklearn_pipeline = _pipeline.configure(trial.config).build("sklearn")

    # Fit the pipeline, indicating when you want to start the trial timing
    try:
        with trial.profile("fit"):
            sklearn_pipeline.fit(X_train, y_train)
    except Exception as e:
        return trial.fail(e)

    # Make our predictions with the model
    with trial.profile("predictions"):
        train_predictions = sklearn_pipeline.predict(X_train)
        val_predictions = sklearn_pipeline.predict(X_val)
        test_predictions = sklearn_pipeline.predict(X_test)

    with trial.profile("probabilities"):
        val_probabilites = sklearn_pipeline.predict_proba(X_val)

    # Save the scores to the summary of the trial
    with trial.profile("scoring"):
        train_acc = float(accuracy_score(train_predictions, y_train))
        val_acc = float(accuracy_score(val_predictions, y_val))
        test_acc = float(accuracy_score(test_predictions, y_test))

    trial.summary["train/acc"] = train_acc
    trial.summary["val/acc"] = val_acc
    trial.summary["test/acc"] = test_acc

    # Save all of this to the file system
    trial.store(
        {
            "model.pkl": sklearn_pipeline,
            "val_probabilities.npy": val_probabilites,
            "val_predictions.npy": val_predictions,
            "test_predictions.npy": test_predictions,
        },
    )

    # Finally report the success
    return trial.success(accuracy=val_acc)


from amltk.store import PathBucket

seed = 42
data = get_dataset(31, seed=seed, splits={"train": 0.6, "val": 0.2, "test": 0.2})

X_train, y_train = data["train"]
X_val, y_val = data["val"]
X_test, y_test = data["test"]

bucket = PathBucket("example-hpo", clean=True, create=True)
data_bucket = bucket / "data"
data_bucket.store(
    {
        "X_train.csv": X_train,
        "X_val.csv": X_val,
        "X_test.csv": X_test,
        "y_train.npy": y_train,
        "y_val.npy": y_val,
        "y_test.npy": y_test,
    },
)

print(bucket)
print(dict(bucket))
print(dict(data_bucket))
from amltk.optimization import Metric
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(2)

from amltk.optimization.optimizers.smac import SMACOptimizer

optimizer = SMACOptimizer.create(
    space=pipeline,  # <!> (1)!
    metrics=Metric("accuracy", minimize=False, bounds=(0.0, 1.0)),
    bucket=bucket,
    seed=seed,
)

# 1. You can also explicitly pass in the space of hyperparameters to optimize.
#   ```python
#   space = pipeline.search_space("configspace")
#   # or
#   space = pipeline.search_space(SMACOptimizer.preffered_parser())
#   ```
task = scheduler.task(target_function)

print(task)


@scheduler.on_start
def launch_initial_tasks() -> None:
    """When we start, launch `n_workers` tasks."""
    trial = optimizer.ask()
    task.submit(trial, _pipeline=pipeline, data_bucket=data_bucket)




@task.on_result
def tell_optimizer(_, report: Trial.Report) -> None:
    """When we get a report, tell the optimizer."""
    optimizer.tell(report)


from amltk.optimization import History

trial_history = History()


@task.on_result
def add_to_history(_, report: Trial.Report) -> None:
    """When we get a report, print it."""
    trial_history.add(report)




@task.on_result
def launch_another_task(*_: Any) -> None:
    """When we get a report, evaluate another trial."""
    if scheduler.running():
        trial = optimizer.ask()
        task.submit(trial, _pipeline=pipeline, data_bucket=data_bucket)




@task.on_exception
def stop_scheduler_on_exception(*_: Any) -> None:
    scheduler.stop()


@task.on_cancelled
def stop_scheduler_on_cancelled(_: Any) -> None:
    scheduler.stop()


if __name__ == "__main__":
    scheduler.run(timeout=5)

    print("Trial history:")
    history_df = trial_history.df()
    print(history_df)

Description#

Dependencies

Requires the following integrations and dependencies:

  • pip install openml amltk[smac, sklearn]

This example shows the basic of setting up a simple HPO loop around a RandomForestClassifier. We will use the OpenML to get a dataset and also use some static preprocessing as part of our pipeline definition.

You can fine the pipeline guide here and the optimization guide here to learn more.

You can skip the imports sections and go straight to the pipeline definition.

Dataset#

Below is just a small function to help us get the dataset from OpenML and encode the labels.

from typing import Any

import openml
from sklearn.preprocessing import LabelEncoder

from amltk.sklearn import split_data


def get_dataset(
    dataset_id: str | int,
    *,
    seed: int,
    splits: dict[str, float],
) -> dict[str, Any]:
    dataset = openml.datasets.get_dataset(
        dataset_id,
        download_data=True,
        download_features_meta_data=False,
        download_qualities=False,
    )

    target_name = dataset.default_target_attribute
    X, y, _, _ = dataset.get_data(dataset_format="dataframe", target=target_name)
    _y = LabelEncoder().fit_transform(y)

    return split_data(X, _y, splits=splits, seed=seed)  # type: ignore

Pipeline Definition#

Here we define a pipeline which splits categoricals and numericals down two different paths, and then combines them back together before passing them to the RandomForestClassifier.

For more on definitions of pipelines, see the Pipeline guide.

from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder

from amltk.pipeline import Component, Node, Sequential, Split

pipeline = (
    Sequential(name="Pipeline")
    >> Split(
        {
            "categorical": [
                SimpleImputer(strategy="constant", fill_value="missing"),
                OneHotEncoder(drop="first"),
            ],
            "numerical": Component(
                SimpleImputer,
                space={"strategy": ["mean", "median"]},
            ),
        },
        name="feature_preprocessing",
    )
    >> Component(
        RandomForestClassifier,
        space={
            "n_estimators": (10, 100),
            "max_features": (0.0, 1.0),
            "criterion": ["gini", "entropy", "log_loss"],
        },
    )
)

print(pipeline)
print(pipeline.search_space("configspace"))
Sequential(name='Pipeline', item=None, nodes=(Split(name='feature_preprocessing', item=None, nodes=(Sequential(name='categorical', item=None, nodes=(Fixed(name='SimpleImputer', item=SimpleImputer(fill_value='missing', strategy='constant'), nodes=(), config=None, space=None, fidelities=None, config_transform=None, meta=None), Fixed(name='OneHotEncoder', item=OneHotEncoder(drop='first'), nodes=(), config=None, space=None, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None), Sequential(name='numerical', item=None, nodes=(Component(name='SimpleImputer', item=<class 'sklearn.impute._base.SimpleImputer'>, nodes=(), config=None, space={'strategy': ['mean', 'median']}, fidelities=None, config_transform=None, meta=None),), config=None, space=None, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None), Component(name='RandomForestClassifier', item=<class 'sklearn.ensemble._forest.RandomForestClassifier'>, nodes=(), config=None, space={'n_estimators': (10, 100), 'max_features': (0.0, 1.0), 'criterion': ['gini', 'entropy', 'log_loss']}, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None)
Configuration space object:
  Hyperparameters:
    Pipeline:RandomForestClassifier:criterion, Type: Categorical, Choices: {gini, entropy, log_loss}, Default: gini
    Pipeline:RandomForestClassifier:max_features, Type: UniformFloat, Range: [0.0, 1.0], Default: 0.5
    Pipeline:RandomForestClassifier:n_estimators, Type: UniformInteger, Range: [10, 100], Default: 55
    Pipeline:feature_preprocessing:numerical:SimpleImputer:strategy, Type: Categorical, Choices: {mean, median}, Default: mean

Target Function#

The function we will optimize must take in a Trial and return a Trial.Report. We also pass in a PathBucket which is a dict-like view of the file system, where we have our dataset stored.

We also pass in our pipeline, which we will use to build our sklearn pipeline with a specific trial.config suggested by the Optimizer.

from sklearn.metrics import accuracy_score

from amltk.optimization import Trial
from amltk.store import PathBucket


def target_function(
    trial: Trial,
    _pipeline: Node,
    data_bucket: PathBucket,
) -> Trial.Report:
    trial.store({"config.json": trial.config})
    # Load in data
    with trial.profile("data-loading"):
        X_train, X_val, X_test, y_train, y_val, y_test = (
            data_bucket["X_train.csv"].load(),
            data_bucket["X_val.csv"].load(),
            data_bucket["X_test.csv"].load(),
            data_bucket["y_train.npy"].load(),
            data_bucket["y_val.npy"].load(),
            data_bucket["y_test.npy"].load(),
        )

    # Configure the pipeline with the trial config before building it.
    sklearn_pipeline = _pipeline.configure(trial.config).build("sklearn")

    # Fit the pipeline, indicating when you want to start the trial timing
    try:
        with trial.profile("fit"):
            sklearn_pipeline.fit(X_train, y_train)
    except Exception as e:
        return trial.fail(e)

    # Make our predictions with the model
    with trial.profile("predictions"):
        train_predictions = sklearn_pipeline.predict(X_train)
        val_predictions = sklearn_pipeline.predict(X_val)
        test_predictions = sklearn_pipeline.predict(X_test)

    with trial.profile("probabilities"):
        val_probabilites = sklearn_pipeline.predict_proba(X_val)

    # Save the scores to the summary of the trial
    with trial.profile("scoring"):
        train_acc = float(accuracy_score(train_predictions, y_train))
        val_acc = float(accuracy_score(val_predictions, y_val))
        test_acc = float(accuracy_score(test_predictions, y_test))

    trial.summary["train/acc"] = train_acc
    trial.summary["val/acc"] = val_acc
    trial.summary["test/acc"] = test_acc

    # Save all of this to the file system
    trial.store(
        {
            "model.pkl": sklearn_pipeline,
            "val_probabilities.npy": val_probabilites,
            "val_predictions.npy": val_predictions,
            "test_predictions.npy": test_predictions,
        },
    )

    # Finally report the success
    return trial.success(accuracy=val_acc)

Running the Whole Thing#

Now we can run the whole thing. We will use the Scheduler to run the optimization, and the SMACOptimizer to optimize the pipeline.

Getting and storing data#

We use a PathBucket to store the data. This is a dict-like view of the file system.

from amltk.store import PathBucket

seed = 42
data = get_dataset(31, seed=seed, splits={"train": 0.6, "val": 0.2, "test": 0.2})

X_train, y_train = data["train"]
X_val, y_val = data["val"]
X_test, y_test = data["test"]

bucket = PathBucket("example-hpo", clean=True, create=True)
data_bucket = bucket / "data"
data_bucket.store(
    {
        "X_train.csv": X_train,
        "X_val.csv": X_val,
        "X_test.csv": X_test,
        "y_train.npy": y_train,
        "y_val.npy": y_val,
        "y_test.npy": y_test,
    },
)

print(bucket)
print(dict(bucket))
print(dict(data_bucket))
PathBucket(PosixPath('example-hpo'))
{'data': Drop(key=PosixPath('example-hpo/data'))}
{'y_test.npy': Drop(key=PosixPath('example-hpo/data/y_test.npy')), 'X_val.csv': Drop(key=PosixPath('example-hpo/data/X_val.csv')), 'y_val.npy': Drop(key=PosixPath('example-hpo/data/y_val.npy')), 'X_train.csv': Drop(key=PosixPath('example-hpo/data/X_train.csv')), 'y_train.npy': Drop(key=PosixPath('example-hpo/data/y_train.npy')), 'X_test.csv': Drop(key=PosixPath('example-hpo/data/X_test.csv'))}

Setting up the Scheduler, Task and Optimizer#

We use the Scheduler.with_processes method to create a Scheduler that will run the optimization.

Please check out the full guides to learn more!

We then create an SMACOptimizer which will optimize the pipeline. We pass in pipeline, and SMAC the optimizer will parser out the space of hyperparameters to optimize.

from amltk.optimization import Metric
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(2)

from amltk.optimization.optimizers.smac import SMACOptimizer

optimizer = SMACOptimizer.create(
    space=pipeline,  #  (1)!
    metrics=Metric("accuracy", minimize=False, bounds=(0.0, 1.0)),
    bucket=bucket,
    seed=seed,
)
  1. You can also explicitly pass in the space of hyperparameters to optimize.
    space = pipeline.search_space("configspace")
    # or
    space = pipeline.search_space(SMACOptimizer.preffered_parser())
    

Next we create a Task, passing in the function we want to run and the scheduler we will run it in.

task = scheduler.task(target_function)

print(task)
Task(unique_ref=Task-target_function-oe1r2TsT, plugins=[])

We use the callback decorators of the Scheduler and the Task to add callbacks that get called during events that happen during the running of the scheduler. Using this, we can control the flow of how things run. Check out the task guide for more.

This one here asks the optimizer for a new trial when the scheduler starts and launches the task we created earlier with this trial.

@scheduler.on_start
def launch_initial_tasks() -> None:
    """When we start, launch `n_workers` tasks."""
    trial = optimizer.ask()
    task.submit(trial, _pipeline=pipeline, data_bucket=data_bucket)

When a Task returns and we get a report, i.e. with task.success() or task.fail(), the task will fire off the callbacks registered with @result. We can use these to add callbacks that get called when these events happen.

Here we use it to update the optimizer with the report we got.

@task.on_result
def tell_optimizer(_, report: Trial.Report) -> None:
    """When we get a report, tell the optimizer."""
    optimizer.tell(report)

We can use the History class to store the reports we get from the Task. We can then use this to analyze the results of the optimization afterwords.

from amltk.optimization import History

trial_history = History()


@task.on_result
def add_to_history(_, report: Trial.Report) -> None:
    """When we get a report, print it."""
    trial_history.add(report)

We launch a new task when the scheduler is empty, i.e. when all the tasks have finished. This will keep going until we hit the timeout we set on the scheduler.

If you want to run the optimization in parallel, you can use the @task.on_result callback to launch a new task when you get a report. This will launch a new task as soon as one finishes.

@task.on_result
def launch_another_task(*_: Any) -> None:
    """When we get a report, evaluate another trial."""
    if scheduler.running():
        trial = optimizer.ask()
        task.submit(trial, _pipeline=pipeline, data_bucket=data_bucket)

If something goes wrong, we likely want to stop the scheduler.

@task.on_exception
def stop_scheduler_on_exception(*_: Any) -> None:
    scheduler.stop()


@task.on_cancelled
def stop_scheduler_on_cancelled(_: Any) -> None:
    scheduler.stop()

Setting the system to run#

Lastly we use Scheduler.run to run the scheduler. We pass in a timeout of 20 seconds.

if __name__ == "__main__":
    scheduler.run(timeout=5)

    print("Trial history:")
    history_df = trial_history.df()
    print(history_df)