Skip to content

Parallelization on Cluster#

Expand to copy examples/1_basics/7_parallelization_cluster.py (top right)
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from smac import BlackBoxFacade, Scenario

__copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI"
__license__ = "3-clause BSD"

import numpy as np
from ConfigSpace import ConfigurationSpace, Float
from ConfigSpace import Configuration  # for type hints

class Branin:
    def __init__(self, seed: int = 0):
        cs = ConfigurationSpace(seed=seed)
        x0 = Float("x0", (-5, 10), default=-5, log=False)
        x1 = Float("x1", (0, 15), default=2, log=False)
        cs.add([x0, x1])

        self.cs = cs

    def train(self, config: Configuration, seed: int = 0) -> float:
        x0 = config["x0"]
        x1 = config["x1"]
        a = 1.0
        b = 5.1 / (4.0 * np.pi**2)
        c = 5.0 / np.pi
        r = 6.0
        s = 10.0
        t = 1.0 / (8.0 * np.pi)
        return a * (x1 - b * x0**2 + c * x0 - r) ** 2 \
             + s * (1 - t) * np.cos(x0) + s

if __name__ == "__main__":
    model = Branin()

    # Scenario object specifying the optimization "environment"
    scenario = Scenario(
        model.cs,
        deterministic=True,
        n_trials=100,
        trial_walltime_limit=100,
        n_workers=5,
    )

    n_workers = 5  # Use 5 workers on the cluster
    # Please note that the number of workers is directly set in the
    # cluster / client. `scenario.n_workers` is ignored in this case.

    cluster = SLURMCluster(
        queue="partition_name",                 # Name of the partition
        cores=4,                                # CPU cores requested
        memory="4 GB",                          # RAM requested
        walltime="00:10:00",                    # Walltime limit for a runner job.
        processes=1,                            # Number of processes per worker
        log_directory="tmp/smac_dask_slurm",    # Logging directory
        nanny=False,                            # False unless you want to use pynisher
        worker_extra_args=[
            "--worker-port",                    # Worker port range
            "60010:60100"],                     # Worker port range
        scheduler_options={
            "port": 60001,                      # Main Job Port
        },
        # account="myaccount",                  # Account name on the cluster (optional)
    )
    cluster.scale(jobs=n_workers)

    # Dask will create n_workers jobs on the cluster which stay open.
    # Then, SMAC/Dask will schedule individual runs on the workers like on your local machine.
    client = Client(
        address=cluster,
    )
    client.wait_for_workers(n_workers)

    # Now we use SMAC to find the best hyperparameters
    smac = BlackBoxFacade(
        scenario,
        model.train,  # We pass the target function here
        overwrite=True,  # Overrides any previous results that are found that are inconsistent with the meta-data
        dask_client=client,
    )
    incumbent = smac.optimize()
    print(f"Best configuration found: {incumbent}")

Description#

An example of applying SMAC to optimize Branin using parallelization via Dask client on a SLURM cluster. If you do not want to use a cluster but your local machine, set dask_client to None and pass n_workers to the Scenario.

from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from smac import BlackBoxFacade, Scenario

__copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI"
__license__ = "3-clause BSD"

import numpy as np
from ConfigSpace import ConfigurationSpace, Float
from ConfigSpace import Configuration  # for type hints

class Branin:
    def __init__(self, seed: int = 0):
        cs = ConfigurationSpace(seed=seed)
        x0 = Float("x0", (-5, 10), default=-5, log=False)
        x1 = Float("x1", (0, 15), default=2, log=False)
        cs.add([x0, x1])

        self.cs = cs

    def train(self, config: Configuration, seed: int = 0) -> float:
        x0 = config["x0"]
        x1 = config["x1"]
        a = 1.0
        b = 5.1 / (4.0 * np.pi**2)
        c = 5.0 / np.pi
        r = 6.0
        s = 10.0
        t = 1.0 / (8.0 * np.pi)
        return a * (x1 - b * x0**2 + c * x0 - r) ** 2 \
             + s * (1 - t) * np.cos(x0) + s

if __name__ == "__main__":
    model = Branin()

    # Scenario object specifying the optimization "environment"
    scenario = Scenario(
        model.cs,
        deterministic=True,
        n_trials=100,
        trial_walltime_limit=100,
        n_workers=5,
    )

    n_workers = 5  # Use 5 workers on the cluster
    # Please note that the number of workers is directly set in the
    # cluster / client. `scenario.n_workers` is ignored in this case.

    cluster = SLURMCluster(
        queue="partition_name",                 # Name of the partition
        cores=4,                                # CPU cores requested
        memory="4 GB",                          # RAM requested
        walltime="00:10:00",                    # Walltime limit for a runner job.
        processes=1,                            # Number of processes per worker
        log_directory="tmp/smac_dask_slurm",    # Logging directory
        nanny=False,                            # False unless you want to use pynisher
        worker_extra_args=[
            "--worker-port",                    # Worker port range
            "60010:60100"],                     # Worker port range
        scheduler_options={
            "port": 60001,                      # Main Job Port
        },
        # account="myaccount",                  # Account name on the cluster (optional)
    )
    cluster.scale(jobs=n_workers)

    # Dask will create n_workers jobs on the cluster which stay open.
    # Then, SMAC/Dask will schedule individual runs on the workers like on your local machine.
    client = Client(
        address=cluster,
    )
    client.wait_for_workers(n_workers)

    # Now we use SMAC to find the best hyperparameters
    smac = BlackBoxFacade(
        scenario,
        model.train,  # We pass the target function here
        overwrite=True,  # Overrides any previous results that are found that are inconsistent with the meta-data
        dask_client=client,
    )
    incumbent = smac.optimize()
    print(f"Best configuration found: {incumbent}")