Note
Go to the end to download the full example code.
Multi-Layer Perceptron via PyTorch¶
This more advanced example incorporates multiple objectives, budgets and statuses to show the strength of DeepCAVE’s recorder.
import os
import time as t
import random
import ConfigSpace as CS
from ConfigSpace import ConfigurationSpace
from ConfigSpace.hyperparameters import (
UniformFloatHyperparameter,
UniformIntegerHyperparameter,
CategoricalHyperparameter,
)
from deepcave import Recorder, Objective
from deepcave.runs import Status
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import torchvision.transforms as transforms
from torchmetrics import Accuracy
from torchvision.datasets import MNIST
import pytorch_lightning as pl
NUM_WORKERS = 16
class MNISTModel(pl.LightningModule):
def __init__(self, activation="relu", learning_rate=1e-4, dropout_rate=0.1, batch_size=64):
super().__init__()
if activation == "relu":
self.activation = nn.ReLU
elif activation == "tanh":
self.activation = nn.Tanh
elif activation == "sigmoid":
self.activation = nn.Sigmoid
else:
raise RuntimeError("Activation not found.")
self.learning_rate = learning_rate
self.dropout_rate = dropout_rate
self.batch_size = batch_size
self.data_dir = os.path.join(os.getcwd(), "datasets")
self.num_classes = 10
self.dims = (1, 28, 28)
self.channels, self.width, self.height = self.dims
self.transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)),
]
)
self.accuracy = Accuracy(task="multiclass", num_classes=self.num_classes)
def prepare_data(self):
# download
MNIST(self.data_dir, train=True, download=True)
MNIST(self.data_dir, train=False, download=True)
def setup(self, stage=None):
# Assign train/val datasets for use in dataloaders
if stage == "fit" or stage is None:
mnist_full = MNIST(self.data_dir, train=True, transform=self.transform)
self.mnist_train, self.mnist_val = random_split(mnist_full, [20000, 40000])
# Assign test dataset for use in dataloader(s)
if stage == "test" or stage is None:
self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
return DataLoader(self.mnist_train, batch_size=self.batch_size, num_workers=NUM_WORKERS)
def val_dataloader(self):
return DataLoader(self.mnist_val, batch_size=self.batch_size, num_workers=NUM_WORKERS)
def test_dataloader(self):
return DataLoader(self.mnist_test, batch_size=self.batch_size, num_workers=NUM_WORKERS)
def training_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
preds = torch.argmax(logits, dim=1)
self.accuracy(preds, y)
self.log("val_loss", loss, prog_bar=True)
self.log("val_acc", self.accuracy, prog_bar=True)
return loss
def test_step(self, batch, batch_idx):
return self.validation_step(batch, batch_idx)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
return optimizer
class MLP(MNISTModel):
def __init__(self, activation, learning_rate, dropout_rate, batch_size, num_neurons=(64, 32)):
super().__init__(activation, learning_rate, dropout_rate, batch_size)
self.layers = nn.Sequential(
nn.Flatten(),
nn.Linear(self.channels * self.width * self.height, num_neurons[0]),
self.activation(),
nn.Dropout(dropout_rate),
nn.Linear(num_neurons[0], num_neurons[1]),
self.activation(),
nn.Dropout(dropout_rate),
nn.Linear(num_neurons[1], self.num_classes),
)
def forward(self, x):
x = self.layers(x)
return F.log_softmax(x, dim=1)
class CNN(MNISTModel):
def __init__(self, activation, learning_rate, dropout_rate, batch_size):
super().__init__(activation, learning_rate, dropout_rate, batch_size)
self.conv1 = nn.Sequential(
nn.Conv2d(
in_channels=self.channels,
out_channels=16,
kernel_size=5,
stride=1,
padding=2,
),
self.activation(),
nn.Dropout(dropout_rate),
nn.MaxPool2d(kernel_size=2),
)
self.conv2 = nn.Sequential(
nn.Conv2d(16, 32, 5, 1, 2),
self.activation(),
nn.Dropout(dropout_rate),
nn.MaxPool2d(2),
nn.Flatten(),
)
# fully connected layer, output 10 classes
self.out = nn.Linear(32 * 7 * 7, self.num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.out(x)
return F.log_softmax(x, dim=1)
def get_configspace(seed):
configspace = ConfigurationSpace(seed=seed)
model = CategoricalHyperparameter(name="model", choices=["mlp", "cnn"])
activation = CategoricalHyperparameter(name="activation", choices=["sigmoid", "tanh", "relu"])
learning_rate = UniformFloatHyperparameter(
name="learning_rate", lower=0.0001, upper=0.1, log=True
)
dropout_rate = UniformFloatHyperparameter(name="dropout_rate", lower=0.1, upper=0.9)
batch_size = UniformIntegerHyperparameter(name="batch_size", lower=16, upper=256)
# MLP specific
num_neurons_layer1 = UniformIntegerHyperparameter(name="num_neurons_layer1", lower=5, upper=100)
num_neurons_layer2 = UniformIntegerHyperparameter(name="num_neurons_layer2", lower=5, upper=100)
configspace.add(
[
model,
activation,
learning_rate,
dropout_rate,
batch_size,
num_neurons_layer1,
num_neurons_layer2,
]
)
# Now add sub configspace
configspace.add(CS.EqualsCondition(num_neurons_layer1, model, "mlp"))
configspace.add(CS.EqualsCondition(num_neurons_layer2, model, "mlp"))
return configspace
if __name__ == "__main__":
# Define objectives
accuracy = Objective("accuracy", lower=0, upper=1, optimize="upper")
loss = Objective("loss", lower=0, optimize="lower")
time = Objective("time", lower=0, optimize="lower")
# Define budgets
max_epochs = 8
n_epochs = 4
budgets = np.linspace(0, max_epochs, num=n_epochs)
# Others
num_configs = 1000
num_runs = 3
save_path = "logs/DeepCAVE/mnist_pytorch"
for run_id in range(num_runs):
random.seed(run_id)
configspace = get_configspace(run_id)
with Recorder(configspace, objectives=[accuracy, loss, time], save_path=save_path) as r:
for config in configspace.sample_configuration(num_configs):
pl.seed_everything(run_id)
kwargs = dict(
activation=config["activation"],
learning_rate=config["learning_rate"],
dropout_rate=config["dropout_rate"],
batch_size=config["batch_size"],
)
if config["model"] == "mlp":
model = MLP(
**kwargs,
num_neurons=(
config["num_neurons_layer1"],
config["num_neurons_layer2"],
),
)
elif config["model"] == "cnn":
model = CNN(**kwargs) # type: ignore
start_time = t.time()
for i in range(1, n_epochs):
budget = budgets[i]
# How many epochs has to be run in this round
epochs = int(budgets[i]) - int(budgets[i - 1])
pl.seed_everything(run_id)
r.start(config, budget, model=model)
# The model weights are trained
trainer = pl.Trainer(
accelerator="cpu",
devices=1,
num_sanity_val_steps=0, # No validation sanity
deterministic=True,
min_epochs=epochs,
max_epochs=epochs,
)
trainer.fit(model)
result = trainer.test(model)
accuracy_ = result[0]["val_acc"]
loss_ = result[0]["val_loss"]
# We just add some random stati to show the potential later in DeepCAVE
if accuracy_ < 0.5:
status = Status.CRASHED
accuracy_, loss_ = None, None
elif random.uniform(0, 1) < 0.05: # 5% chance
statusses = [Status.MEMORYOUT, Status.TIMEOUT]
status = random.choice(statusses)
accuracy_, loss_ = None, None
else:
status = Status.SUCCESS
end_time = t.time()
elapsed_time = end_time - start_time
r.end(costs=[accuracy_, loss_, elapsed_time], status=status)