Skip to content

Optimization Guide#

One of the core tasks of any AutoML system is to optimize some objective, whether it be some pipeline, a black-box or even a toy function. In the context of AMLTK, this means defining some Metric(s) to optimize and creating an Optimizer to optimize them.

You can check out the integrated optimizers in our optimizer reference.

This guide relies lightly on topics covered in the Pipeline Guide for creating a pipeline but also the Scheduling guide for creating a Scheduler and a Task. These aren't required but if something is not clear or you'd like to know how something works, please refer to these guides or the reference!

Optimizing a 1-D function#

We'll start with a simple example of maximizing a polynomial function The first thing to do is define the function we want to optimize.

import numpy as np
import matplotlib.pyplot as plt

def poly(x):
    return (x**2 + 4*x + 3) / x

fig, ax = plt.subplots()
x = np.linspace(-10, 10, 100)
ax.plot(x, poly(x))

2024-04-24T14:11:57.033254 image/svg+xml Matplotlib v3.8.4, https://matplotlib.org/

Our next step is to define the search range over which we want to optimize, in this case, the range of values x can take. Here we use a simple Searchable, however we can represent entire machine learning pipelines, with conditionality and much more complex ranges. (Pipeline guide)

Vocab...

When dealing with such functions, one might call the x just a parameter. However in the context of Machine Learning, if this poly() function was more like train_model(), then we would refer to x as a hyperparameter with it's range as it's search space.

from amltk.pipeline import Searchable

def poly(x: float) -> float:
    return (x**2 + 4*x + 3) / x

s = Searchable(
    {"x": (-10.0, 10.0)},
    name="my-searchable"
)

╭─ Searchable(my-searchable) ─╮
 space {'x': (-10.0, 10.0)}  
╰─────────────────────────────╯

Creating an Optimizer#

We'll utilize SMAC here for optimization as an example but you can find other available optimizers here.

Requirements

This requires smac which can be installed with:

pip install amltk[smac]

# Or directly
pip install smac

The first thing we'll need to do is create a Metric: a definition of some value we want to optimize.

from amltk.optimization import Metric

metric = Metric("score", minimize=False)
print(metric)
score (maximize)

The next step is to actually create an optimizer, you'll have to refer to their reference documentation. However, for most integrated optimizers, we expose a helpful create().

from amltk.optimization.optimizers.smac import SMACOptimizer
from amltk.optimization import Metric
from amltk.pipeline import Searchable

def poly(x: float) -> float:
    return (x**2 + 4*x + 3) / x

metric = Metric("score", minimize=False)
space = Searchable(space={"x": (-10.0, 10.0)}, name="my-searchable")

optimizer = SMACOptimizer.create(space=space, metrics=metric, seed=42)

Running an Optimizer#

At this point, we can begin optimizing our function, using the ask to get Trials and tell methods with Trial.Reports.

from amltk.optimization.optimizers.smac import SMACOptimizer
from amltk.optimization import Metric, History, Trial
from amltk.pipeline import Searchable

def poly(x: float) -> float:
    return (x**2 + 4*x + 3) / x

metric = Metric("score", minimize=False)
space = Searchable(space={"x": (-10.0, 10.0)}, name="my-searchable")

optimizer = SMACOptimizer.create(space=space, metrics=metric, seed=42)

history = History()
for _ in range(10):
    # Get a trial from an Optimizer
    trial: Trial = optimizer.ask()
    print(f"Evaluating trial {trial.name} with config {trial.config}")

    # Access the the trial's config
    x = trial.config["my-searchable:x"]

    try:
        score = poly(x)
    except ZeroDivisionError as e:
        # Generate a failed report (i.e. poly(x) raised divide by zero exception with x=0)
        report = trial.fail(e)
    else:
        # Generate a success report
        report = trial.success(score=score)

    # Store artifacts with the trial, using file extensions to infer how to store it
    trial.store({ "config.json": trial.config, "array.npy": [1, 2, 3] })

    # Tell the Optimizer about the report
    optimizer.tell(report)

    # Add the report to the history
    history.add(report)
Evaluating trial config_id=1_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': 5.9014238975942135}
Evaluating trial config_id=2_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': -2.0745517686009407}
Evaluating trial config_id=3_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': -8.257772866636515}
Evaluating trial config_id=4_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': 4.430919848382473}
Evaluating trial config_id=5_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': 0.24310464039444923}
Evaluating trial config_id=6_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': -6.413793563842773}
Evaluating trial config_id=7_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': -2.58980056270957}
Evaluating trial config_id=8_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': 8.760508447885513}
Evaluating trial config_id=9_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': 8.428955599665642}
Evaluating trial config_id=10_seed=1608637542_budget=None_instance=None with config {'my-searchable:x': -4.599663596600294}

And we can use the History to see the history of the optimization process

df = history.df()
print(df)
                                                     status  ...  config:my-searchable:x
name                                                         ...                        
config_id=1_seed=1608637542_budget=None_instanc...  success  ...                5.901424
config_id=2_seed=1608637542_budget=None_instanc...  success  ...               -2.074552
config_id=3_seed=1608637542_budget=None_instanc...  success  ...               -8.257773
config_id=4_seed=1608637542_budget=None_instanc...  success  ...                 4.43092
config_id=5_seed=1608637542_budget=None_instanc...  success  ...                0.243105
config_id=6_seed=1608637542_budget=None_instanc...  success  ...               -6.413794
config_id=7_seed=1608637542_budget=None_instanc...  success  ...               -2.589801
config_id=8_seed=1608637542_budget=None_instanc...  success  ...                8.760508
config_id=9_seed=1608637542_budget=None_instanc...  success  ...                8.428956
config_id=10_seed=1608637542_budget=None_instan...  success  ...               -4.599664

[10 rows x 9 columns]

Okay so there are a few things introduced all at once here, let's go over them bit by bit.

The Trial object#

The Trial object is the main object that you'll be interacting with when optimizing. It contains a load of useful properties and functionality to help you during optimization.

The .config will contain name spaced parameters, in this case, my-searchable:x, based on the pipeline/search space you specified.

It's also quite typical to store artifacts with the trial, a common feature of things like TensorBoard, MLFlow, etc. We provide a primitive way to store artifacts with the trial using .store() which takes a dictionary of file names to file contents. The file extension is used to infer how to store the file, for example, .json files will be stored as JSON, .npy files will be stored as numpy arrays. You are of course still free to use your other favourite logging tools in conjunction with AMLTK!

Lastly, we use trial.success() or trial.fail() which generates a Trial.Report for us, that we can give back to the optimizer.

Feel free to explore the full API.

The History object#

You may have noticed that we also created a History object to store our reports in. This is a simple container to store the reports together and get a dataframe out of. We may extend this with future utility such as plotting or other export formats but for now, we can use it primarily for getting our results together in one place.

We'll create a simple example where we create our own trials and record some results on them, getting out a dataframe at the end.

from amltk.optimization import History, Trial, Metric
from amltk.store import PathBucket

metric = Metric("score", minimize=False, bounds=(0, 5))
history = History()

trials = [
    Trial.create(name="trial-1", config={"x": 1.0}, metrics=[metric]),
    Trial.create(name="trial-2", config={"x": 2.0}, metrics=[metric]),
    Trial.create(name="trial-3", config={"x": 3.0}, metrics=[metric]),
]

for trial in trials:
    x = trial.config["x"]
    if x >= 2:
        report = trial.fail()
    else:
        report = trial.success(score=x)

    history.add(report)

df = history.df()
print(df)

best = history.best()
print(best)
          status  trial_seed  ... metric:score [0.0, 5.0] (maximize) config:x
name                          ...                                            
trial-1  success        <NA>  ...                                  1        1
trial-2     fail        <NA>  ...                               <NA>        2
trial-3     fail        <NA>  ...                               <NA>        3

[3 rows x 9 columns]
Trial.Report(trial=Trial(name='trial-1', config={'x': 1.0}, bucket=PathBucket(PosixPath('trial-trial-1-2024-04-24T14:11:57.194626')), metrics=MetricCollection(metrics={'score': Metric(name='score', minimize=False, bounds=(0.0, 5.0), fn=None)}), created_at=datetime.datetime(2024, 4, 24, 14, 11, 57, 194621), seed=None, fidelities={}, summary={}, storage=set(), extras={}), status=<Status.SUCCESS: 'success'>, reported_at=datetime.datetime(2024, 4, 24, 14, 11, 57, 195070), exception=None, values={'score': 1.0})

You can use the History.df() method to get a dataframe of the history and use your favourite dataframe tools to analyze the results.

Optimizing an Sklearn-Pipeline#

To give a more concrete example, we will optimize a simple sklearn pipeline. You'll likely want to refer to the pipeline guide for more information on pipelines, but the example should be clear enough without it.

We start with defining our pipeline.

from typing import Any
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.neural_network import MLPClassifier

from amltk.pipeline import Sequential, Choice, Component

def dims_to_hidden_layer(config: dict[str, Any], _):
    config = dict(config)
    config["hidden_layer_sizes"] = (config.pop("dim1"), config.pop("dim2"))
    return config

# A pipeline with a choice of scalers and a parametrized MLP
my_pipeline = (
    Sequential(name="my-pipeline")
    >> Choice(
        StandardScaler,
        MinMaxScaler,
        Component(RobustScaler, space={"with_scaling": [True, False], "unit_variance": [True, False]}),
        name="scaler",
    )
    >> Component(
        MLPClassifier,
        space={
            "dim1": (1, 10),
            "dim2": (1, 10),
            "activation": ["relu", "tanh", "logistic"],
        },
        config_transform=dims_to_hidden_layer,
    )
)

╭─ Sequential(my-pipeline) ────────────────────────────────────────────────────╮
 ╭─ Choice(scaler) ─────────────────────────────────────────────────────────╮ 
  ╭─ Component(MinMaxSc─╮ ╭─ Component(RobustSc─╮ ╭─ Component(StandardS─╮  
   item class            item  class           item class             
        MinMaxScaler(         RobustScaler        StandardScaler   
  ╰─────────────────────╯  space {              ╰──────────────────────╯  
                                     'with_sc…                            
                                 [                                        
                                         True,                            
                                         False                            
                                     ],                                   
                                     'unit_va…                            
                                 [                                        
                                         True,                            
                                         False                            
                                     ]                                    
                                 }                                        
                          ╰─────────────────────╯                           
 ╰──────────────────────────────────────────────────────────────────────────╯ 
  
 ╭─ Component(MLPClassifier) ───────────────────────────────╮                 
  item      class MLPClassifier(...)                                        
  space     {                                                               
                'dim1': (1, 10),                                            
                'dim2': (1, 10),                                            
                'activation': [                                             
                    'relu',                                                 
                    'tanh',                                                 
                    'logistic'                                              
                ]                                                           
            }                                                               
  transform def dims_to_hidden_layer(...)                                   
 ╰──────────────────────────────────────────────────────────╯                 
╰──────────────────────────────────────────────────────────────────────────────╯

Next up, we need to define a simple target function we want to evaluate on.

from sklearn.model_selection import cross_validate
from amltk.optimization import Trial
from amltk.store import Stored
import numpy as np

def evaluate(
    trial: Trial,
    pipeline: Sequential,
    X: Stored[np.ndarray],
    y: Stored[np.ndarray],
) -> Trial.Report:
    # Configure our pipeline and build it
    sklearn_pipeline = (
        pipeline
        .configure(trial.config)
        .build("sklearn")
    )

    # Load in our data
    X = X.load()
    y = y.load()

    # Use sklearns.cross_validate as our evaluator
    with trial.profile("cross-validate"):
        results = cross_validate(sklearn_pipeline, X, y, scoring="accuracy", cv=3, return_estimator=True)

    test_scores = results["test_score"]
    estimators = results["estimator"]  # You can store these if you like (you'll likely want to use the `.pkl` suffix for the filename)

    # Report the mean test score
    mean_test_score = np.mean(test_scores)
    return trial.success(acc=mean_test_score)

With that, we'll also store our data, so that on each evaluate call, we load it in. This doesn't make much sense for a single in-process call but when scaling up to using multiple processes or remote compute, this is a good practice to follow.

For this we use a PathBucket and get a Stored from it, a reference to some object we can load() back in later.

from sklearn.datasets import load_iris
from amltk.store import PathBucket

# Load in our data
_X, _y = load_iris(return_X_y=True)

# Store our data in a bucket
bucket = PathBucket("my-bucket")
stored_X = bucket["X.npy"].put(_X)
stored_y = bucket["y.npy"].put(_y)

Lastly, we'll create our optimizer and run it. In this example, we'll use the SMACOptimizer but you can refer to the optimizer reference for other optimizers. For basic use cases, you should be able to swap in and out the optimizer and it should work without any changes.

from amltk.optimization.optimizers.smac import SMACOptimizer
from amltk.optimization import Metric, History

metric = Metric("acc", minimize=False, bounds=(0, 1))
optimizer = SMACOptimizer.create(
    space=my_pipeline,  # Let it know what to optimize
    metrics=metric,  # And let it know what to expect
    bucket=bucket,  # And where to store artifacts for trials and optimizer output
)

history = History()

for _ in range(10):
    # Get a trial from the optimizer
    trial = optimizer.ask()

    # Evaluate the trial
    report = evaluate(trial=trial, pipeline=my_pipeline, X=stored_X, y=stored_y)

    # Tell the optimizer about the report
    optimizer.tell(report)

    # Add the report to the history
    history.add(report)

df = history.df()
                                                     status  ...  config:my-pipeline:scaler:RobustScaler:with_scaling
name                                                         ...                                                     
config_id=1_seed=1561514027_budget=None_instanc...  success  ...                                               <NA>  
config_id=2_seed=1561514027_budget=None_instanc...  success  ...                                              False  
config_id=3_seed=1561514027_budget=None_instanc...  success  ...                                               <NA>  
config_id=4_seed=1561514027_budget=None_instanc...  success  ...                                               <NA>  
config_id=5_seed=1561514027_budget=None_instanc...  success  ...                                               <NA>  
config_id=6_seed=1561514027_budget=None_instanc...  success  ...                                              False  
config_id=7_seed=1561514027_budget=None_instanc...  success  ...                                              False  
config_id=8_seed=1561514027_budget=None_instanc...  success  ...                                               <NA>  
config_id=9_seed=1561514027_budget=None_instanc...  success  ...                                               True  
config_id=10_seed=1561514027_budget=None_instan...  success  ...                                               <NA>  

[10 rows x 26 columns]