Performing HPO with Post-Hoc Ensembling.#
Expand to copy examples/hpo_with_ensembling.py
(top right)
from __future__ import annotations
import shutil
import traceback
from asyncio import Future
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
import openml
from sklearn.compose import make_column_selector
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import VarianceThreshold
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import (
LabelEncoder,
MinMaxScaler,
OneHotEncoder,
RobustScaler,
StandardScaler,
)
from sklearn.svm import SVC
from amltk.data.conversions import probabilities_to_classes
from amltk.ensembling.weighted_ensemble_caruana import weighted_ensemble_caruana
from amltk.optimization import History, Metric, Trial
from amltk.optimization.optimizers.smac import SMACOptimizer
from amltk.pipeline import Choice, Component, Sequential, Split
from amltk.scheduling import Scheduler
from amltk.sklearn.data import split_data
from amltk.store import PathBucket
def get_dataset(seed: int) -> tuple[np.ndarray, ...]:
dataset = openml.datasets.get_dataset(
31,
download_qualities=False,
download_features_meta_data=False,
download_data=True,
)
X, y, _, _ = dataset.get_data(
dataset_format="dataframe",
target=dataset.default_target_attribute,
)
_y = LabelEncoder().fit_transform(y)
splits = split_data( # <!> (1)!
X, # <!>
_y, # <!>
splits={"train": 0.6, "val": 0.2, "test": 0.2}, # <!>
seed=seed, # <!>
) # <!>
x_train, y_train = splits["train"]
x_val, y_val = splits["val"]
x_test, y_test = splits["test"]
return x_train, x_val, x_test, y_train, y_val, y_test # type: ignore
# 1. We use the [`split_data()`][amltk.sklearn.data.split_data] function from the
# to split the data into a custom amount of splits, in this case
# `#!python "train", "val", "test"`. You could also use the
# dedicated [`train_val_test_split()`][amltk.sklearn.data.train_val_test_split]
# function instead.
pipeline = (
Sequential(name="Pipeline")
>> Split(
{
"categories": [
SimpleImputer(strategy="constant", fill_value="missing"),
Component(
OneHotEncoder,
space={
"min_frequency": (0.01, 0.1),
"handle_unknown": ["ignore", "infrequent_if_exist"],
},
config={"drop": "first"},
),
],
"numbers": [
Component(SimpleImputer, space={"strategy": ["mean", "median"]}),
Component(VarianceThreshold, space={"threshold": (0.0, 0.2)}),
Choice(StandardScaler, MinMaxScaler, RobustScaler, name="scaler"),
],
},
name="feature_preprocessing",
config={
"categories": make_column_selector(dtype_include=object),
"numbers": make_column_selector(dtype_include=np.number),
},
)
>> Choice( # <!> (1)!
Component(SVC, space={"C": (0.1, 10.0)}, config={"probability": True}),
Component(
RandomForestClassifier,
space={"n_estimators": (10, 100), "criterion": ["gini", "log_loss"]},
),
Component(
MLPClassifier,
space={
"activation": ["identity", "logistic", "relu"],
"alpha": (0.0001, 0.1),
"learning_rate": ["constant", "invscaling", "adaptive"],
},
),
)
)
print(pipeline)
print(pipeline.search_space("configspace"))
# 1. Here we define a choice of algorithms to use where each entry is a possible
# algorithm to use. Each algorithm is defined by a step, which is a
# configuration of a sklearn estimator. The space parameter is a dictionary
# of hyperparameters to optimize over, and the config parameter is a
# dictionary of fixed parameters to set on the estimator.
# 2. Here we gropu the numerical preprocessing steps to use. Each step is a
# scaler to use. Each scaler is defined by a step, which is a configuration
# of the preprocessor. The space parameter is a dictionary of
# hyperparameters to optimize over, and the config parameter is a dictionary
# of fixed parameters to set on the preprocessing step.
# 3. Here we group the categorical preprocessing steps to use.
# Each step is given a space, which is a dictionary of hyperparameters to
# optimize over, and a config, which is a dictionary of fixed parameters to
# set on the preprocessing step.
def target_function(
trial: Trial,
bucket: PathBucket,
pipeline: Sequential,
) -> Trial.Report:
X_train, X_val, X_test, y_train, y_val, y_test = ( # (1)!
bucket["X_train.csv"].load(),
bucket["X_val.csv"].load(),
bucket["X_test.csv"].load(),
bucket["y_train.npy"].load(),
bucket["y_val.npy"].load(),
bucket["y_test.npy"].load(),
)
pipeline = pipeline.configure(trial.config) # <!> (2)!
sklearn_pipeline = pipeline.build("sklearn") # <!>
try:
with trial.profile("fit"): # <!> (3)!
sklearn_pipeline.fit(X_train, y_train)
except Exception as e:
tb = traceback.format_exc()
trial.store(
{
"exception.txt": str(e),
"config.json": dict(trial.config),
"traceback.txt": str(tb),
},
)
return trial.fail() # <!> (4)!
# Make our predictions with the model
train_predictions = sklearn_pipeline.predict(X_train)
val_predictions = sklearn_pipeline.predict(X_val)
test_predictions = sklearn_pipeline.predict(X_test)
val_probabilites = sklearn_pipeline.predict_proba(X_val)
val_accuracy = float(accuracy_score(val_predictions, y_val))
# Save the scores to the summary of the trial
trial.summary["train_accuracy"] = float(accuracy_score(train_predictions, y_train))
trial.summary["validation_accuracy"] = val_accuracy
trial.summary["test_accuracy"] = float(accuracy_score(test_predictions, y_test))
# Save all of this to the file system
trial.store( # (5)!
{
"config.json": dict(trial.config),
"scores.json": trial.summary,
"model.pkl": sklearn_pipeline,
"val_predictions.npy": val_predictions,
"val_probabilities.npy": val_probabilites,
"test_predictions.npy": test_predictions,
},
)
return trial.success(accuracy=val_accuracy) # <!> (6)!
# 1. We can easily load data from a [`PathBucket`][amltk.store.PathBucket]
# using the `load` method.
# 2. We configure the pipeline with a specific set of hyperparameters suggested
# by the optimizer through the [`Trial`][amltk.optimization.Trial] object.
# 3. We begin the trial by timing the execution of the target function and capturing
# any potential exceptions.
# 4. If the trial failed, we return a failed report with a cost of infinity.
# 5. We save the results of the trial using
# [`Trial.store`][amltk.optimization.Trial.store], creating a subdirectory
# for this trial.
# 6. We return a successful report with the cost of the trial, which is the
# inverse of the validation accuracy.
@dataclass
class Ensemble:
weights: dict[str, float]
trajectory: list[tuple[str, float]]
configs: dict[str, dict[str, Any]]
def create_ensemble(
history: History,
bucket: PathBucket,
/,
size: int = 5,
seed: int = 42,
) -> Ensemble:
if len(history) == 0:
return Ensemble({}, [], {})
validation_predictions = {
report.name: report.retrieve("val_probabilities.npy") for report in history
}
targets = bucket["y_val.npy"].load()
accuracy: Callable[[np.ndarray, np.ndarray], float] = accuracy_score # type: ignore
def _score(_targets: np.ndarray, ensembled_probabilities: np.ndarray) -> float:
predictions = probabilities_to_classes(ensembled_probabilities, classes=[0, 1])
return accuracy(_targets, predictions)
weights, trajectory, final_probabilities = weighted_ensemble_caruana( # <!>
model_predictions=validation_predictions, # <!>
targets=targets, # <!>
size=size, # <!>
metric=_score, # <!>
select=max, # <!>
seed=seed, # <!>
) # <!>
configs = {name: history[name].retrieve("config.json") for name in weights}
return Ensemble(weights=weights, trajectory=trajectory, configs=configs)
seed = 42
X_train, X_val, X_test, y_train, y_val, y_test = get_dataset(seed) # (1)!
path = Path("example-hpo-with-ensembling")
if path.exists():
shutil.rmtree(path)
bucket = PathBucket(path)
bucket.store( # (2)!
{
"X_train.csv": X_train,
"X_val.csv": X_val,
"X_test.csv": X_test,
"y_train.npy": y_train,
"y_val.npy": y_val,
"y_test.npy": y_test,
},
)
scheduler = Scheduler.with_processes() # (3)!
optimizer = SMACOptimizer.create(
space=pipeline,
metrics=Metric("accuracy", minimize=False, bounds=(0, 1)),
bucket=path,
seed=seed,
) # (4)!
task = scheduler.task(target_function) # (5)!
ensemble_task = scheduler.task(create_ensemble) # (6)!
trial_history = History()
ensembles: list[Ensemble] = []
@scheduler.on_start # (7)!
def launch_initial_tasks() -> None:
"""When we start, launch `n_workers` tasks."""
trial = optimizer.ask()
task.submit(trial, bucket=bucket, pipeline=pipeline)
@task.on_result
def tell_optimizer(future: Future, report: Trial.Report) -> None:
"""When we get a report, tell the optimizer."""
optimizer.tell(report)
@task.on_result
def add_to_history(future: Future, report: Trial.Report) -> None:
"""When we get a report, print it."""
trial_history.add(report)
@task.on_result
def launch_ensemble_task(future: Future, report: Trial.Report) -> None:
"""When a task successfully completes, launch an ensemble task."""
if report.status is Trial.Status.SUCCESS:
ensemble_task.submit(trial_history, bucket)
@task.on_result
def launch_another_task(*_: Any) -> None:
"""When we get a report, evaluate another trial."""
trial = optimizer.ask()
task.submit(trial, bucket=bucket, pipeline=pipeline)
@ensemble_task.on_result
def save_ensemble(future: Future, ensemble: Ensemble) -> None:
"""When an ensemble task returns, save it."""
ensembles.append(ensemble)
@ensemble_task.on_exception
def print_ensemble_exception(future: Future[Any], exception: BaseException) -> None:
"""When an exception occurs, log it and stop."""
print(exception)
scheduler.stop()
@task.on_exception
def print_task_exception(future: Future[Any], exception: BaseException) -> None:
"""When an exception occurs, log it and stop."""
print(exception)
scheduler.stop()
@scheduler.on_timeout
def run_last_ensemble_task() -> None:
"""When the scheduler is empty, run the last ensemble task."""
ensemble_task.submit(trial_history, bucket)
if __name__ == "__main__":
scheduler.run(timeout=5, wait=True) # (8)!
print("Trial history:")
history_df = trial_history.df()
print(history_df)
best_ensemble = max(ensembles, key=lambda e: e.trajectory[-1])
print("Best ensemble:")
print(best_ensemble)
# 1. We use `#!python get_dataset()` defined earlier to load the
# dataset.
# 2. We use [`store()`][amltk.store.Bucket.store] to store the data in the bucket, with
# each key being the name of the file and the value being the data.
# 3. We use [`Scheduler.with_processes()`][amltk.scheduling.Scheduler.with_processes]
# create a [`Scheduler`][amltk.scheduling.Scheduler] that runs everything
# in a different process. You can of course use a different backend if you want.
# 4. We use [`SMACOptimizer.create()`][amltk.optimization.optimizers.smac.SMACOptimizer.create] to create a
# [`SMACOptimizer`][amltk.optimization.optimizers.smac.SMACOptimizer] given the space from the pipeline
# to optimize over.
# 5. We create a [`Task`][amltk.scheduling.Task] that will run our objective, passing
# in the function to run and the scheduler for where to run it
# 6. We use [`task()`][amltk.scheduling.Task] to create a
# [`Task`][amltk.scheduling.Task] for the `create_ensemble` method above.
# This will also run in parallel with the hpo trials if using a non-sequential scheduling mode.
# 7. We use `@scheduler.on_start()` hook to register a
# callback that will be called when the scheduler starts. We can use the
# `repeat` argument to make sure it's called many times if we want.
# 8. We use [`Scheduler.run()`][amltk.scheduling.Scheduler.run] to run the scheduler.
# Here we set it to run briefly for 5 seconds and wait for remaining tasks to finish
# before continuing.
Description#
Dependencies
Requires the following integrations and dependencies:
pip install openml amltk[smac, sklearn]
This example performs Hyperparameter optimization on a fairly default data-preprocessing + model sklearn pipeline, using a dataset pulled from OpenML.
After the HPO is complete, we use the validation predictions from each trial to create an ensemble using the Weighted Ensemble algorithm from Caruana et al. (2004).
Reference: Ensemble selection from libraries of models
Rich Caruana, Alexandru Niculescu-Mizil, Geoff Crew and Alex Ksikes
ICML 2004
dl.acm.org/doi/10.1145/1015330.1015432
www.cs.cornell.edu/~caruana/ctp/ct.papers/caruana.icml04.icdm06long.pdf
This makes heavy use of the pipelines and the optimization faculties of amltk. You can fine the pipeline guide here and the optimization guide here to learn more.
You can skip the imports sections and go straight to the pipeline definition.
Imports#
from __future__ import annotations
import shutil
import traceback
from asyncio import Future
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
import openml
from sklearn.compose import make_column_selector
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import VarianceThreshold
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import (
LabelEncoder,
MinMaxScaler,
OneHotEncoder,
RobustScaler,
StandardScaler,
)
from sklearn.svm import SVC
from amltk.data.conversions import probabilities_to_classes
from amltk.ensembling.weighted_ensemble_caruana import weighted_ensemble_caruana
from amltk.optimization import History, Metric, Trial
from amltk.optimization.optimizers.smac import SMACOptimizer
from amltk.pipeline import Choice, Component, Sequential, Split
from amltk.scheduling import Scheduler
from amltk.sklearn.data import split_data
from amltk.store import PathBucket
Below is just a small function to help us get the dataset from OpenML and encode the labels.
def get_dataset(seed: int) -> tuple[np.ndarray, ...]:
dataset = openml.datasets.get_dataset(
31,
download_qualities=False,
download_features_meta_data=False,
download_data=True,
)
X, y, _, _ = dataset.get_data(
dataset_format="dataframe",
target=dataset.default_target_attribute,
)
_y = LabelEncoder().fit_transform(y)
splits = split_data( # (1)!
X,
_y,
splits={"train": 0.6, "val": 0.2, "test": 0.2},
seed=seed,
)
x_train, y_train = splits["train"]
x_val, y_val = splits["val"]
x_test, y_test = splits["test"]
return x_train, x_val, x_test, y_train, y_val, y_test # type: ignore
- We use the
split_data()
function from the to split the data into a custom amount of splits, in this case"train", "val", "test"
. You could also use the dedicatedtrain_val_test_split()
function instead.
Pipeline Definition#
Here we define a pipeline which splits categoricals and numericals down two different paths, and then combines them back together before passing them to a choice of classifier between a Random Forest, Support Vector Machine, and Multi-Layer Perceptron.
For more on definitions of pipelines, see the Pipeline guide.
pipeline = (
Sequential(name="Pipeline")
>> Split(
{
"categories": [
SimpleImputer(strategy="constant", fill_value="missing"),
Component(
OneHotEncoder,
space={
"min_frequency": (0.01, 0.1),
"handle_unknown": ["ignore", "infrequent_if_exist"],
},
config={"drop": "first"},
),
],
"numbers": [
Component(SimpleImputer, space={"strategy": ["mean", "median"]}),
Component(VarianceThreshold, space={"threshold": (0.0, 0.2)}),
Choice(StandardScaler, MinMaxScaler, RobustScaler, name="scaler"),
],
},
name="feature_preprocessing",
config={
"categories": make_column_selector(dtype_include=object),
"numbers": make_column_selector(dtype_include=np.number),
},
)
>> Choice( # (1)!
Component(SVC, space={"C": (0.1, 10.0)}, config={"probability": True}),
Component(
RandomForestClassifier,
space={"n_estimators": (10, 100), "criterion": ["gini", "log_loss"]},
),
Component(
MLPClassifier,
space={
"activation": ["identity", "logistic", "relu"],
"alpha": (0.0001, 0.1),
"learning_rate": ["constant", "invscaling", "adaptive"],
},
),
)
)
print(pipeline)
print(pipeline.search_space("configspace"))
- Here we define a choice of algorithms to use where each entry is a possible algorithm to use. Each algorithm is defined by a step, which is a configuration of a sklearn estimator. The space parameter is a dictionary of hyperparameters to optimize over, and the config parameter is a dictionary of fixed parameters to set on the estimator.
- Here we gropu the numerical preprocessing steps to use. Each step is a scaler to use. Each scaler is defined by a step, which is a configuration of the preprocessor. The space parameter is a dictionary of hyperparameters to optimize over, and the config parameter is a dictionary of fixed parameters to set on the preprocessing step.
- Here we group the categorical preprocessing steps to use. Each step is given a space, which is a dictionary of hyperparameters to optimize over, and a config, which is a dictionary of fixed parameters to set on the preprocessing step.
Sequential(name='Pipeline', item=None, nodes=(Split(name='feature_preprocessing', item=None, nodes=(Sequential(name='categories', item=None, nodes=(Fixed(name='SimpleImputer', item=SimpleImputer(fill_value='missing', strategy='constant'), nodes=(), config=None, space=None, fidelities=None, config_transform=None, meta=None), Component(name='OneHotEncoder', item=<class 'sklearn.preprocessing._encoders.OneHotEncoder'>, nodes=(), config={'drop': 'first'}, space={'min_frequency': (0.01, 0.1), 'handle_unknown': ['ignore', 'infrequent_if_exist']}, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None), Sequential(name='numbers', item=None, nodes=(Component(name='SimpleImputer', item=<class 'sklearn.impute._base.SimpleImputer'>, nodes=(), config=None, space={'strategy': ['mean', 'median']}, fidelities=None, config_transform=None, meta=None), Component(name='VarianceThreshold', item=<class 'sklearn.feature_selection._variance_threshold.VarianceThreshold'>, nodes=(), config=None, space={'threshold': (0.0, 0.2)}, fidelities=None, config_transform=None, meta=None), Choice(name='scaler', item=None, nodes=(Component(name='MinMaxScaler', item=<class 'sklearn.preprocessing._data.MinMaxScaler'>, nodes=(), config=None, space=None, fidelities=None, config_transform=None, meta=None), Component(name='RobustScaler', item=<class 'sklearn.preprocessing._data.RobustScaler'>, nodes=(), config=None, space=None, fidelities=None, config_transform=None, meta=None), Component(name='StandardScaler', item=<class 'sklearn.preprocessing._data.StandardScaler'>, nodes=(), config=None, space=None, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None)), config={'categories': <sklearn.compose._column_transformer.make_column_selector object at 0x7efd98e26bc0>, 'numbers': <sklearn.compose._column_transformer.make_column_selector object at 0x7efd98e26f20>}, space=None, fidelities=None, config_transform=None, meta=None), Choice(name='Choice-Q6hAl1zM', item=None, nodes=(Component(name='MLPClassifier', item=<class 'sklearn.neural_network._multilayer_perceptron.MLPClassifier'>, nodes=(), config=None, space={'activation': ['identity', 'logistic', 'relu'], 'alpha': (0.0001, 0.1), 'learning_rate': ['constant', 'invscaling', 'adaptive']}, fidelities=None, config_transform=None, meta=None), Component(name='RandomForestClassifier', item=<class 'sklearn.ensemble._forest.RandomForestClassifier'>, nodes=(), config=None, space={'n_estimators': (10, 100), 'criterion': ['gini', 'log_loss']}, fidelities=None, config_transform=None, meta=None), Component(name='SVC', item=<class 'sklearn.svm._classes.SVC'>, nodes=(), config={'probability': True}, space={'C': (0.1, 10.0)}, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None)), config=None, space=None, fidelities=None, config_transform=None, meta=None)
Configuration space object:
Hyperparameters:
Pipeline:Choice-Q6hAl1zM:MLPClassifier:activation, Type: Categorical, Choices: {identity, logistic, relu}, Default: identity
Pipeline:Choice-Q6hAl1zM:MLPClassifier:alpha, Type: UniformFloat, Range: [0.0001, 0.1], Default: 0.05005
Pipeline:Choice-Q6hAl1zM:MLPClassifier:learning_rate, Type: Categorical, Choices: {constant, invscaling, adaptive}, Default: constant
Pipeline:Choice-Q6hAl1zM:RandomForestClassifier:criterion, Type: Categorical, Choices: {gini, log_loss}, Default: gini
Pipeline:Choice-Q6hAl1zM:RandomForestClassifier:n_estimators, Type: UniformInteger, Range: [10, 100], Default: 55
Pipeline:Choice-Q6hAl1zM:SVC:C, Type: UniformFloat, Range: [0.1, 10.0], Default: 5.05
Pipeline:Choice-Q6hAl1zM:__choice__, Type: Categorical, Choices: {MLPClassifier, RandomForestClassifier, SVC}, Default: MLPClassifier
Pipeline:feature_preprocessing:categories:OneHotEncoder:handle_unknown, Type: Categorical, Choices: {ignore, infrequent_if_exist}, Default: ignore
Pipeline:feature_preprocessing:categories:OneHotEncoder:min_frequency, Type: UniformFloat, Range: [0.01, 0.1], Default: 0.055
Pipeline:feature_preprocessing:numbers:SimpleImputer:strategy, Type: Categorical, Choices: {mean, median}, Default: mean
Pipeline:feature_preprocessing:numbers:VarianceThreshold:threshold, Type: UniformFloat, Range: [0.0, 0.2], Default: 0.1
Pipeline:feature_preprocessing:numbers:scaler:__choice__, Type: Categorical, Choices: {MinMaxScaler, RobustScaler, StandardScaler}, Default: MinMaxScaler
Conditions:
Pipeline:Choice-Q6hAl1zM:MLPClassifier:activation | Pipeline:Choice-Q6hAl1zM:__choice__ == 'MLPClassifier'
Pipeline:Choice-Q6hAl1zM:MLPClassifier:alpha | Pipeline:Choice-Q6hAl1zM:__choice__ == 'MLPClassifier'
Pipeline:Choice-Q6hAl1zM:MLPClassifier:learning_rate | Pipeline:Choice-Q6hAl1zM:__choice__ == 'MLPClassifier'
Pipeline:Choice-Q6hAl1zM:RandomForestClassifier:criterion | Pipeline:Choice-Q6hAl1zM:__choice__ == 'RandomForestClassifier'
Pipeline:Choice-Q6hAl1zM:RandomForestClassifier:n_estimators | Pipeline:Choice-Q6hAl1zM:__choice__ == 'RandomForestClassifier'
Pipeline:Choice-Q6hAl1zM:SVC:C | Pipeline:Choice-Q6hAl1zM:__choice__ == 'SVC'
Target Function#
Next we establish the actual target function we wish to evaluate, that is, the function we wish to optimize. In this case, we are optimizing the accuracy of the model on the validation set.
The target function takes a Trial
object, which
has the configuration of the pipeline to evaluate and provides utility
to time, and return the results of the evaluation, whether it be a success
or failure.
We make use of a PathBucket
to store and load the data, and the Pipeline
we defined above to
configure the pipeline with the hyperparameters we are optimizing over.
For more details, please check out the Optimization guide for more details.
def target_function(
trial: Trial,
bucket: PathBucket,
pipeline: Sequential,
) -> Trial.Report:
X_train, X_val, X_test, y_train, y_val, y_test = ( # (1)!
bucket["X_train.csv"].load(),
bucket["X_val.csv"].load(),
bucket["X_test.csv"].load(),
bucket["y_train.npy"].load(),
bucket["y_val.npy"].load(),
bucket["y_test.npy"].load(),
)
pipeline = pipeline.configure(trial.config) # (2)!
sklearn_pipeline = pipeline.build("sklearn")
try:
with trial.profile("fit"): # (3)!
sklearn_pipeline.fit(X_train, y_train)
except Exception as e:
tb = traceback.format_exc()
trial.store(
{
"exception.txt": str(e),
"config.json": dict(trial.config),
"traceback.txt": str(tb),
},
)
return trial.fail() # (4)!
# Make our predictions with the model
train_predictions = sklearn_pipeline.predict(X_train)
val_predictions = sklearn_pipeline.predict(X_val)
test_predictions = sklearn_pipeline.predict(X_test)
val_probabilites = sklearn_pipeline.predict_proba(X_val)
val_accuracy = float(accuracy_score(val_predictions, y_val))
# Save the scores to the summary of the trial
trial.summary["train_accuracy"] = float(accuracy_score(train_predictions, y_train))
trial.summary["validation_accuracy"] = val_accuracy
trial.summary["test_accuracy"] = float(accuracy_score(test_predictions, y_test))
# Save all of this to the file system
trial.store( # (5)!
{
"config.json": dict(trial.config),
"scores.json": trial.summary,
"model.pkl": sklearn_pipeline,
"val_predictions.npy": val_predictions,
"val_probabilities.npy": val_probabilites,
"test_predictions.npy": test_predictions,
},
)
return trial.success(accuracy=val_accuracy) # (6)!
- We can easily load data from a
PathBucket
using theload
method. - We configure the pipeline with a specific set of hyperparameters suggested
by the optimizer through the
Trial
object. - We begin the trial by timing the execution of the target function and capturing any potential exceptions.
- If the trial failed, we return a failed report with a cost of infinity.
- We save the results of the trial using
Trial.store
, creating a subdirectory for this trial. - We return a successful report with the cost of the trial, which is the inverse of the validation accuracy.
Next we define a simple @dataclass
to store the
our definition of an Esemble, which is simply a collection of the models trial
names to their weight in the ensemble. We also store the trajectory of the
ensemble, which is a list of tuples of the trial name and the weight of the
trial at that point in the trajectory. Finally, we store the configuration
of each trial in the ensemble.
We could of course add extra functionality to the Ensemble, give it references
to the PathBucket
and the pipeline objects,
and even add methods to train the ensemble, but for the sake of simplicity we will
leave it as is.
@dataclass
class Ensemble:
weights: dict[str, float]
trajectory: list[tuple[str, float]]
configs: dict[str, dict[str, Any]]
def create_ensemble(
history: History,
bucket: PathBucket,
/,
size: int = 5,
seed: int = 42,
) -> Ensemble:
if len(history) == 0:
return Ensemble({}, [], {})
validation_predictions = {
report.name: report.retrieve("val_probabilities.npy") for report in history
}
targets = bucket["y_val.npy"].load()
accuracy: Callable[[np.ndarray, np.ndarray], float] = accuracy_score # type: ignore
def _score(_targets: np.ndarray, ensembled_probabilities: np.ndarray) -> float:
predictions = probabilities_to_classes(ensembled_probabilities, classes=[0, 1])
return accuracy(_targets, predictions)
weights, trajectory, final_probabilities = weighted_ensemble_caruana(
model_predictions=validation_predictions,
targets=targets,
size=size,
metric=_score,
select=max,
seed=seed,
)
configs = {name: history[name].retrieve("config.json") for name in weights}
return Ensemble(weights=weights, trajectory=trajectory, configs=configs)
Main#
Finally we come to the main script that runs everything.
seed = 42
X_train, X_val, X_test, y_train, y_val, y_test = get_dataset(seed) # (1)!
path = Path("example-hpo-with-ensembling")
if path.exists():
shutil.rmtree(path)
bucket = PathBucket(path)
bucket.store( # (2)!
{
"X_train.csv": X_train,
"X_val.csv": X_val,
"X_test.csv": X_test,
"y_train.npy": y_train,
"y_val.npy": y_val,
"y_test.npy": y_test,
},
)
scheduler = Scheduler.with_processes() # (3)!
optimizer = SMACOptimizer.create(
space=pipeline,
metrics=Metric("accuracy", minimize=False, bounds=(0, 1)),
bucket=path,
seed=seed,
) # (4)!
task = scheduler.task(target_function) # (5)!
ensemble_task = scheduler.task(create_ensemble) # (6)!
trial_history = History()
ensembles: list[Ensemble] = []
@scheduler.on_start # (7)!
def launch_initial_tasks() -> None:
"""When we start, launch `n_workers` tasks."""
trial = optimizer.ask()
task.submit(trial, bucket=bucket, pipeline=pipeline)
@task.on_result
def tell_optimizer(future: Future, report: Trial.Report) -> None:
"""When we get a report, tell the optimizer."""
optimizer.tell(report)
@task.on_result
def add_to_history(future: Future, report: Trial.Report) -> None:
"""When we get a report, print it."""
trial_history.add(report)
@task.on_result
def launch_ensemble_task(future: Future, report: Trial.Report) -> None:
"""When a task successfully completes, launch an ensemble task."""
if report.status is Trial.Status.SUCCESS:
ensemble_task.submit(trial_history, bucket)
@task.on_result
def launch_another_task(*_: Any) -> None:
"""When we get a report, evaluate another trial."""
trial = optimizer.ask()
task.submit(trial, bucket=bucket, pipeline=pipeline)
@ensemble_task.on_result
def save_ensemble(future: Future, ensemble: Ensemble) -> None:
"""When an ensemble task returns, save it."""
ensembles.append(ensemble)
@ensemble_task.on_exception
def print_ensemble_exception(future: Future[Any], exception: BaseException) -> None:
"""When an exception occurs, log it and stop."""
print(exception)
scheduler.stop()
@task.on_exception
def print_task_exception(future: Future[Any], exception: BaseException) -> None:
"""When an exception occurs, log it and stop."""
print(exception)
scheduler.stop()
@scheduler.on_timeout
def run_last_ensemble_task() -> None:
"""When the scheduler is empty, run the last ensemble task."""
ensemble_task.submit(trial_history, bucket)
if __name__ == "__main__":
scheduler.run(timeout=5, wait=True) # (8)!
print("Trial history:")
history_df = trial_history.df()
print(history_df)
best_ensemble = max(ensembles, key=lambda e: e.trajectory[-1])
print("Best ensemble:")
print(best_ensemble)
- We use
get_dataset()
defined earlier to load the dataset. - We use
store()
to store the data in the bucket, with each key being the name of the file and the value being the data. - We use
Scheduler.with_processes()
create aScheduler
that runs everything in a different process. You can of course use a different backend if you want. - We use
SMACOptimizer.create()
to create aSMACOptimizer
given the space from the pipeline to optimize over. - We create a
Task
that will run our objective, passing in the function to run and the scheduler for where to run it - We use
task()
to create aTask
for thecreate_ensemble
method above. This will also run in parallel with the hpo trials if using a non-sequential scheduling mode. - We use
@scheduler.on_start()
hook to register a callback that will be called when the scheduler starts. We can use therepeat
argument to make sure it's called many times if we want. - We use
Scheduler.run()
to run the scheduler. Here we set it to run briefly for 5 seconds and wait for remaining tasks to finish before continuing.