Parallel Usage: Spawning workers from the command line

Auto-sklearn uses dask.distributed for parallel optimization.

This example shows how to start the dask scheduler and spawn workers for Auto-sklearn manually from the command line. Use this example as a starting point to parallelize Auto-sklearn across multiple machines.

To run Auto-sklearn in parallel on a single machine check out the example Parallel Usage on a single machine.

If you want to start everything manually from within Python please see :ref:sphx_glr_examples_60_search_example_parallel_manual_spawning_python.py.

NOTE: Above example is disabled due to issue https://github.com/dask/distributed/issues/5627

You can learn more about the dask command line interface from https://docs.dask.org/en/latest/setup/cli.html.

When manually passing a dask client to Auto-sklearn, all logic must be guarded by if __name__ == "__main__": statements! We use multiple such statements to properly render this example as a notebook and also allow execution via the command line.

Background

To run Auto-sklearn distributed on multiple machines we need to set up three components:

  1. Auto-sklearn and a dask client. This will manage all workload, find new configurations to evaluate and submit jobs via a dask client. As this runs Bayesian optimization it should be executed on its own CPU.

  2. The dask workers. They will do the actual work of running machine learning algorithms and require their own CPU each.

  3. The scheduler. It manages the communication between the dask client and the different dask workers. As the client and all workers connect to the scheduler it must be started first. This is a light-weight job and does not require its own CPU.

We will now start these three components in reverse order: scheduler, workers and client. Also, in a real setup, the scheduler and the workers should be started from the command line and not from within a Python file via the subprocess module as done here (for the sake of having a self-contained example).

Import statements

import multiprocessing
import subprocess
import time

import dask.distributed
import sklearn.datasets
import sklearn.metrics

from autosklearn.classification import AutoSklearnClassifier
from autosklearn.constants import MULTICLASS_CLASSIFICATION

tmp_folder = "/tmp/autosklearn_parallel_3_example_tmp"

worker_processes = []

0. Setup client-scheduler communication

In this examples the dask scheduler is started without an explicit address and port. Instead, the scheduler takes a free port and stores relevant information in a file for which we provided the name and location. This filename is also given to the worker so they can find all relevant information to connect to the scheduler.

scheduler_file_name = "scheduler-file.json"

1. Start scheduler

Starting the scheduler is done with the following bash command:

dask-scheduler --scheduler-file scheduler-file.json --idle-timeout 10

We will now execute this bash command from within Python to have a self-contained example:

def cli_start_scheduler(scheduler_file_name):
    command = f"dask-scheduler --scheduler-file {scheduler_file_name} --idle-timeout 10"
    proc = subprocess.run(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        shell=True,
        check=True,
    )
    while proc.returncode is None:
        time.sleep(1)


if __name__ == "__main__":
    process_python_worker = multiprocessing.Process(
        target=cli_start_scheduler,
        args=(scheduler_file_name,),
    )
    process_python_worker.start()
    worker_processes.append(process_python_worker)

    # Wait a second for the scheduler to become available
    time.sleep(1)

2. Start two workers

Starting the scheduler is done with the following bash command:

DASK_DISTRIBUTED__WORKER__DAEMON=False \
    dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 \
    --scheduler-file scheduler-file.json

We will now execute this bash command from within Python to have a self-contained example. Please note, that DASK_DISTRIBUTED__WORKER__DAEMON=False is required in this case as dask-worker creates a new process, which by default is not compatible with Auto-sklearn creating new processes in the workers itself. We disable dask’s memory management by passing --memory-limit as Auto-sklearn does the memory management itself.

def cli_start_worker(scheduler_file_name):
    command = (
        "DASK_DISTRIBUTED__WORKER__DAEMON=False "
        "dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 "
        f"--scheduler-file {scheduler_file_name}"
    )
    proc = subprocess.run(
        command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True
    )
    while proc.returncode is None:
        time.sleep(1)


if __name__ == "__main__":
    for _ in range(2):
        process_cli_worker = multiprocessing.Process(
            target=cli_start_worker,
            args=(scheduler_file_name,),
        )
        process_cli_worker.start()
        worker_processes.append(process_cli_worker)

    # Wait a second for workers to become available
    time.sleep(1)

3. Creating a client in Python

Finally we create a dask cluster which also connects to the scheduler via the information in the file created by the scheduler.

client = dask.distributed.Client(scheduler_file=scheduler_file_name)

Start Auto-sklearn

if __name__ == "__main__":
    X, y = sklearn.datasets.load_breast_cancer(return_X_y=True)
    X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
        X, y, random_state=1
    )

    automl = AutoSklearnClassifier(
        delete_tmp_folder_after_terminate=False,
        time_left_for_this_task=30,
        per_run_time_limit=10,
        memory_limit=2048,
        tmp_folder=tmp_folder,
        seed=777,
        # n_jobs is ignored internally as we pass a dask client.
        n_jobs=1,
        # Pass a dask client which connects to the previously constructed cluster.
        dask_client=client,
    )
    automl.fit(X_train, y_train)

    automl.fit_ensemble(
        y_train,
        task=MULTICLASS_CLASSIFICATION,
        dataset_name="digits",
        ensemble_kwargs={"ensemble_size": 20},
        ensemble_nbest=50,
    )

    predictions = automl.predict(X_test)
    print(automl.sprint_statistics())
    print("Accuracy score", sklearn.metrics.accuracy_score(y_test, predictions))
Fitting to the training data:   0%|          | 0/30 [00:00<?, ?it/s, The total time budget for this task is 0:00:30]
Fitting to the training data:   3%|3         | 1/30 [00:01<00:29,  1.01s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:   7%|6         | 2/30 [00:02<00:28,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  10%|#         | 3/30 [00:03<00:27,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  13%|#3        | 4/30 [00:04<00:26,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  17%|#6        | 5/30 [00:05<00:25,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  20%|##        | 6/30 [00:06<00:24,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  23%|##3       | 7/30 [00:07<00:23,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  27%|##6       | 8/30 [00:08<00:22,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  30%|###       | 9/30 [00:09<00:21,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  33%|###3      | 10/30 [00:10<00:20,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  37%|###6      | 11/30 [00:11<00:19,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  40%|####      | 12/30 [00:12<00:18,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  43%|####3     | 13/30 [00:13<00:17,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  47%|####6     | 14/30 [00:14<00:16,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  50%|#####     | 15/30 [00:15<00:15,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  53%|#####3    | 16/30 [00:16<00:14,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  57%|#####6    | 17/30 [00:17<00:13,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  60%|######    | 18/30 [00:18<00:12,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  63%|######3   | 19/30 [00:19<00:11,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  67%|######6   | 20/30 [00:20<00:10,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data:  70%|#######   | 21/30 [00:21<00:09,  1.00s/it, The total time budget for this task is 0:00:30]
Fitting to the training data: 100%|##########| 30/30 [00:21<00:00,  1.43it/s, The total time budget for this task is 0:00:30]
auto-sklearn results:
  Dataset name: 55aee503-6bf7-11ed-87b6-77edb579fc6c
  Metric: accuracy
  Best validation score: 0.992908
  Number of target algorithm runs: 10
  Number of successful target algorithm runs: 9
  Number of crashed target algorithm runs: 0
  Number of target algorithms that exceeded the time limit: 1
  Number of target algorithms that exceeded the memory limit: 0

Accuracy score 0.958041958041958

Wait until all workers are closed

This is only necessary if the workers are started from within this python script. In a real application one would start them directly from the command line.

if __name__ == "__main__":
    process_python_worker.join()
    for process in worker_processes:
        process.join()

Total running time of the script: ( 0 minutes 43.240 seconds)

Gallery generated by Sphinx-Gallery