Using the Scheduler with SLURM (dask-jobqueue)#
Expand to copy examples/dask-jobqueue.py
(top right)
import traceback
from typing import Any
import openml
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from amltk.optimization import History, Metric, Trial
from amltk.optimization.optimizers.smac import SMACOptimizer
from amltk.pipeline import Component, Node, Sequential, Split
from amltk.scheduling import Scheduler
from amltk.sklearn import split_data
from amltk.store import PathBucket
N_WORKERS = 32
scheduler = Scheduler.with_slurm(
n_workers=N_WORKERS, # Number of workers to launch
queue="the-name-of-the-partition/queue", # Name of the queue to submit to
cores=1, # Number of cores per worker
memory="4 GB", # Memory per worker
walltime="00:20:00", # Walltime per worker
# submit_command="sbatch --extra-arguments", # Sometimes you need extra arguments to the launch command
)
def get_dataset(
dataset_id: str | int,
*,
seed: int,
splits: dict[str, float],
) -> dict[str, Any]:
dataset = openml.datasets.get_dataset(
dataset_id,
download_data=True,
download_features_meta_data=False,
download_qualities=False,
)
target_name = dataset.default_target_attribute
X, y, _, _ = dataset.get_data(dataset_format="dataframe", target=target_name)
_y = LabelEncoder().fit_transform(y)
return split_data(X, _y, splits=splits, seed=seed) # type: ignore
pipeline = (
Sequential(name="Pipeline")
>> Split(
{
"categorical": [
SimpleImputer(strategy="constant", fill_value="missing"),
OneHotEncoder(drop="first"),
],
"numerical": Component(
SimpleImputer,
space={"strategy": ["mean", "median"]},
),
},
name="feature_preprocessing",
)
>> Component(
RandomForestClassifier,
space={
"n_estimators": (10, 100),
"max_features": (0.0, 1.0),
"criterion": ["gini", "entropy", "log_loss"],
},
)
)
def target_function(trial: Trial, _pipeline: Node) -> Trial.Report:
trial.store({"config.json": trial.config})
with trial.profile("data-loading"):
X_train, X_val, X_test, y_train, y_val, y_test = (
trial.bucket["X_train.csv"].load(),
trial.bucket["X_val.csv"].load(),
trial.bucket["X_test.csv"].load(),
trial.bucket["y_train.npy"].load(),
trial.bucket["y_val.npy"].load(),
trial.bucket["y_test.npy"].load(),
)
sklearn_pipeline = _pipeline.configure(trial.config).build("sklearn")
try:
with trial.profile("fit"):
sklearn_pipeline.fit(X_train, y_train)
except Exception as e:
tb = traceback.format_exc()
trial.store({"exception.txt": f"{e}\n {tb}"})
return trial.fail(e, tb)
with trial.profile("predictions"):
train_predictions = sklearn_pipeline.predict(X_train)
val_predictions = sklearn_pipeline.predict(X_val)
test_predictions = sklearn_pipeline.predict(X_test)
with trial.profile("probabilities"):
val_probabilites = sklearn_pipeline.predict_proba(X_val)
with trial.profile("scoring"):
train_acc = float(accuracy_score(train_predictions, y_train))
val_acc = float(accuracy_score(val_predictions, y_val))
test_acc = float(accuracy_score(test_predictions, y_test))
trial.summary["train/acc"] = train_acc
trial.summary["val/acc"] = val_acc
trial.summary["test/acc"] = test_acc
trial.store(
{
"model.pkl": sklearn_pipeline,
"val_probabilities.npy": val_probabilites,
"val_predictions.npy": val_predictions,
"test_predictions.npy": test_predictions,
},
)
return trial.success(accuracy=val_acc)
seed = 42
data = get_dataset(31, seed=seed, splits={"train": 0.6, "val": 0.2, "test": 0.2})
X_train, y_train = data["train"]
X_val, y_val = data["val"]
X_test, y_test = data["test"]
bucket = PathBucket("example-hpo", clean=True, create=True)
bucket.store(
{
"X_train.csv": X_train,
"X_val.csv": X_val,
"X_test.csv": X_test,
"y_train.npy": y_train,
"y_val.npy": y_val,
"y_test.npy": y_test,
},
)
optimizer = SMACOptimizer.create(
space=pipeline, # <!> (1)!
metrics=Metric("accuracy", minimize=False, bounds=(0.0, 1.0)),
bucket=bucket,
seed=seed,
)
task = scheduler.task(target_function)
@scheduler.on_start(repeat=N_WORKERS)
def launch_initial_tasks() -> None:
"""When we start, launch `n_workers` tasks."""
trial = optimizer.ask()
task.submit(trial, _pipeline=pipeline)
trial_history = History()
@task.on_result
def process_result_and_launc(_, report: Trial.Report) -> None:
"""When we get a report, print it."""
trial_history.add(report)
optimizer.tell(report)
if scheduler.running():
trial = optimizer.ask()
task.submit(trial, _pipeline=pipeline)
@task.on_cancelled
def stop_scheduler_on_cancelled(_: Any) -> None:
raise RuntimeError("Scheduler cancelled a worker!")
if __name__ == "__main__":
scheduler.run(timeout=60)
history_df = trial_history.df()
print(history_df)
print(len(history_df))
Description#
The point of this example is to show
how to set up dask-jobqueue
with a realistic workload.
Dependencies
Requires the following integrations and dependencies:
pip install openml amltk[smac, sklearn, dask-jobqueue]
This example shows how to use dask-jobqueue
to run HPO on a
RandomForestClassifier
with SMAC. This workload is borrowed from
the HPO example.
SMAC can not handle fast updates and seems to be quite efficient for this workload with ~32 cores.
import traceback
from typing import Any
import openml
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from amltk.optimization import History, Metric, Trial
from amltk.optimization.optimizers.smac import SMACOptimizer
from amltk.pipeline import Component, Node, Sequential, Split
from amltk.scheduling import Scheduler
from amltk.sklearn import split_data
from amltk.store import PathBucket
N_WORKERS = 32
scheduler = Scheduler.with_slurm(
n_workers=N_WORKERS, # Number of workers to launch
queue="the-name-of-the-partition/queue", # Name of the queue to submit to
cores=1, # Number of cores per worker
memory="4 GB", # Memory per worker
walltime="00:20:00", # Walltime per worker
# submit_command="sbatch --extra-arguments", # Sometimes you need extra arguments to the launch command
)
def get_dataset(
dataset_id: str | int,
*,
seed: int,
splits: dict[str, float],
) -> dict[str, Any]:
dataset = openml.datasets.get_dataset(
dataset_id,
download_data=True,
download_features_meta_data=False,
download_qualities=False,
)
target_name = dataset.default_target_attribute
X, y, _, _ = dataset.get_data(dataset_format="dataframe", target=target_name)
_y = LabelEncoder().fit_transform(y)
return split_data(X, _y, splits=splits, seed=seed) # type: ignore
pipeline = (
Sequential(name="Pipeline")
>> Split(
{
"categorical": [
SimpleImputer(strategy="constant", fill_value="missing"),
OneHotEncoder(drop="first"),
],
"numerical": Component(
SimpleImputer,
space={"strategy": ["mean", "median"]},
),
},
name="feature_preprocessing",
)
>> Component(
RandomForestClassifier,
space={
"n_estimators": (10, 100),
"max_features": (0.0, 1.0),
"criterion": ["gini", "entropy", "log_loss"],
},
)
)
def target_function(trial: Trial, _pipeline: Node) -> Trial.Report:
trial.store({"config.json": trial.config})
with trial.profile("data-loading"):
X_train, X_val, X_test, y_train, y_val, y_test = (
trial.bucket["X_train.csv"].load(),
trial.bucket["X_val.csv"].load(),
trial.bucket["X_test.csv"].load(),
trial.bucket["y_train.npy"].load(),
trial.bucket["y_val.npy"].load(),
trial.bucket["y_test.npy"].load(),
)
sklearn_pipeline = _pipeline.configure(trial.config).build("sklearn")
try:
with trial.profile("fit"):
sklearn_pipeline.fit(X_train, y_train)
except Exception as e:
tb = traceback.format_exc()
trial.store({"exception.txt": f"{e}\n {tb}"})
return trial.fail(e, tb)
with trial.profile("predictions"):
train_predictions = sklearn_pipeline.predict(X_train)
val_predictions = sklearn_pipeline.predict(X_val)
test_predictions = sklearn_pipeline.predict(X_test)
with trial.profile("probabilities"):
val_probabilites = sklearn_pipeline.predict_proba(X_val)
with trial.profile("scoring"):
train_acc = float(accuracy_score(train_predictions, y_train))
val_acc = float(accuracy_score(val_predictions, y_val))
test_acc = float(accuracy_score(test_predictions, y_test))
trial.summary["train/acc"] = train_acc
trial.summary["val/acc"] = val_acc
trial.summary["test/acc"] = test_acc
trial.store(
{
"model.pkl": sklearn_pipeline,
"val_probabilities.npy": val_probabilites,
"val_predictions.npy": val_predictions,
"test_predictions.npy": test_predictions,
},
)
return trial.success(accuracy=val_acc)
seed = 42
data = get_dataset(31, seed=seed, splits={"train": 0.6, "val": 0.2, "test": 0.2})
X_train, y_train = data["train"]
X_val, y_val = data["val"]
X_test, y_test = data["test"]
bucket = PathBucket("example-hpo", clean=True, create=True)
bucket.store(
{
"X_train.csv": X_train,
"X_val.csv": X_val,
"X_test.csv": X_test,
"y_train.npy": y_train,
"y_val.npy": y_val,
"y_test.npy": y_test,
},
)
optimizer = SMACOptimizer.create(
space=pipeline, # (1)!
metrics=Metric("accuracy", minimize=False, bounds=(0.0, 1.0)),
bucket=bucket,
seed=seed,
)
task = scheduler.task(target_function)
@scheduler.on_start(repeat=N_WORKERS)
def launch_initial_tasks() -> None:
"""When we start, launch `n_workers` tasks."""
trial = optimizer.ask()
task.submit(trial, _pipeline=pipeline)
trial_history = History()
@task.on_result
def process_result_and_launc(_, report: Trial.Report) -> None:
"""When we get a report, print it."""
trial_history.add(report)
optimizer.tell(report)
if scheduler.running():
trial = optimizer.ask()
task.submit(trial, _pipeline=pipeline)
@task.on_cancelled
def stop_scheduler_on_cancelled(_: Any) -> None:
raise RuntimeError("Scheduler cancelled a worker!")
if __name__ == "__main__":
scheduler.run(timeout=60)
history_df = trial_history.df()
print(history_df)
print(len(history_df))