Source code for smac.tae.dask_runner

from typing import Dict, List, Optional, Tuple

import os
import time
import warnings

import dask
from dask.distributed import Client, Future, wait

from smac.configspace import Configuration
from smac.runhistory.runhistory import RunInfo, RunValue
from smac.tae import StatusType
from smac.tae.base import BaseRunner

__copyright__ = "Copyright 2021, AutoML.org Freiburg-Hannover"
__license__ = "3-clause BSD"


[docs]class DaskParallelRunner(BaseRunner): """Interface to submit and collect a job in a distributed fashion. DaskParallelRunner is intended to comply with the bridge design pattern. Nevertheless, to reduce the amount of code within single-vs-parallel implementations, DaskParallelRunner wraps a BaseRunner object which is then executed in parallel on n_workers. This class then is constructed by passing a BaseRunner that implements a run() method, and is capable of doing so in a serial fashion. Then, this wrapper class called DaskParallelRunner uses dask to initialize N number of BaseRunner that actively wait of a RunInfo to produce a RunValue object. To be more precise, the work model is then: 1. The smbo.intensifier dictates "what" to run (a configuration/instance/seed) via a RunInfo object. 2. a tae_runner takes this RunInfo object and launches the task via tae_runner.submit_run(). In the case of DaskParallelRunner, n_workers receive a pickle-object of DaskParallelRunner.single_worker, each with a run() method coming from DaskParallelRunner.single_worker.run() 3. RunInfo objects are run in a distributed fashion, an their results are available locally to each worker. Such result is collected by DaskParallelRunner.get_finished_runs() and then passed to the SMBO. 4. Exceptions are also locally available to each worker and need to be collected. Dask works with Future object which are managed via the DaskParallelRunner.client. Parameters ---------- single_worker: BaseRunner A runner to run in a distributed fashion n_workers: int Number of workers to use for distributed run. Will be ignored if ``dask_client`` is not ``None``. patience: int How much to wait for workers to be available if one fails output_directory: str, optional If given, this will be used for the dask worker directory and for storing server information. If a dask client is passed, it will only be used for storing server information as the worker directory must be set by the program/user starting the workers. dask_client: dask.distributed.Client User-created dask client, can be used to start a dask cluster and then attach SMAC to it. Attributes ---------- results ta stats run_obj par_factor cost_for_crash abort_i_first_run_crash n_workers futures client """ def __init__( self, single_worker: BaseRunner, n_workers: int, patience: int = 5, output_directory: Optional[str] = None, dask_client: Optional[dask.distributed.Client] = None, ): super(DaskParallelRunner, self).__init__( ta=single_worker.ta, stats=single_worker.stats, multi_objectives=single_worker.multi_objectives, run_obj=single_worker.run_obj, par_factor=single_worker.par_factor, cost_for_crash=single_worker.cost_for_crash, abort_on_first_run_crash=single_worker.abort_on_first_run_crash, ) # The single worker, which is replicated on a need # basis to every compute node self.single_worker = single_worker self.n_workers = n_workers # How much time to wait for workers to be available self.patience = patience self.output_directory = output_directory # Because a run() method can have pynisher, we need to prevent the multiprocessing # workers to be instantiated as demonic - this cannot be passed via worker_kwargs dask.config.set({"distributed.worker.daemon": False}) if dask_client is None: self.close_client_at_del = True self.client = Client( n_workers=self.n_workers, processes=True, threads_per_worker=1, local_directory=output_directory, ) if self.output_directory: self.scheduler_file = os.path.join(self.output_directory, ".dask_scheduler_file") self.client.write_scheduler_file(scheduler_file=self.scheduler_file) else: self.close_client_at_del = False self.client = dask_client self.futures = [] # type: List[Future] self.scheduler_info = self.client._get_scheduler_info()
[docs] def submit_run(self, run_info: RunInfo) -> None: """This function submits a configuration embedded in a run_info object, and uses one of the workers to produce a result locally to each worker. The execution of a configuration follows this procedure: 1. SMBO/intensifier generates a run_info 2. SMBO calls submit_run so that a worker launches the run_info 3. submit_run internally calls self.run(). it does so via a call to self.run_wrapper() which contains common code that any run() method will otherwise have to implement, like capping check. Child classes must implement a run() method. All results will be only available locally to each worker, so the main node needs to collect them. Parameters ---------- run_info: RunInfo An object containing the configuration and the necessary data to run it """ # Check for resources or block till one is available if not self._workers_available(): futures = wait(self.futures, return_when="FIRST_COMPLETED") futures.done self._extract_completed_runs_from_futures() # In code check to make sure that there are resources if not self._workers_available(): warnings.warn("No workers are available. This could mean workers crashed" "Waiting for new workers...") time.sleep(self.patience) if not self._workers_available(): raise ValueError( "Tried to execute a job, but no worker was " "available. This likely means that a worker crashed " "or no workers were properly configured." ) # At this point we can submit the job # For `pure=False`, see # http://distributed.dask.org/en/stable/client.html#pure-functions-by-default self.futures.append(self.client.submit(self.single_worker.run_wrapper, run_info, pure=False))
[docs] def get_finished_runs(self) -> List[Tuple[RunInfo, RunValue]]: """This method returns any finished configuration, and returns a list with the results of exercising the configurations. This class keeps populating results to self.results until a call to get_finished runs is done. In this case, the self.results list is emptied and all RunValues produced by running run() are returned. Returns ------- List[RunInfo, RunValue]: A list of RunValues (and respective RunInfo), that is, the results of executing a run_info a submitted configuration """ # Proactively see if more configs have finished self._extract_completed_runs_from_futures() results_list = [] while self.results: results_list.append(self.results.pop()) return results_list
def _extract_completed_runs_from_futures(self) -> None: """A run is over, when a future has done() equal true. This function collects the completed futures and move them from self.futures to self.results. We make sure futures never exceed the capacity of the scheduler """ # In code check to make sure we don;t exceed resource allocation if len(self.futures) > sum(self.client.nthreads().values()): warnings.warn( "More running jobs than resources available " "Should not have more futures/runs in remote workers " "than the number of workers. This could mean a worker " "crashed and was not able to be recovered by dask. " ) # A future is removed to the list of futures as an indication # that a worker is available to take in an extra job done_futures = [f for f in self.futures if f.done()] for future in done_futures: self.results.append(future.result()) self.futures.remove(future)
[docs] def wait(self) -> None: """SMBO/intensifier might need to wait for runs to finish before making a decision. This class waits until 1 run completes """ if self.futures: futures = wait(self.futures, return_when="FIRST_COMPLETED") futures.done
[docs] def pending_runs(self) -> bool: """Whether or not there are configs still running. Generally if the runner is serial, launching a run instantly returns it's result. On parallel runners, there might be pending configurations to complete. """ # If there are futures available, it translates # to runs still not finished/processed return len(self.futures) > 0
[docs] def run( self, config: Configuration, instance: str, cutoff: Optional[float] = None, seed: int = 12345, budget: Optional[float] = None, instance_specific: str = "0", ) -> Tuple[StatusType, float, float, Dict]: """This method only complies with the abstract parent class. In the parallel case, we call the single worker run() method. Parameters ---------- config : Configuration dictionary param -> value instance : string problem instance cutoff : float, optional Wallclock time limit of the target algorithm. If no value is provided no limit will be enforced. seed : int random seed budget : float, optional A positive, real-valued number representing an arbitrary limit to the target algorithm. Handled by the target algorithm internally instance_specific: str instance specific information (e.g., domain file or solution) Returns ------- status: enum of StatusType (int) {SUCCESS, TIMEOUT, CRASHED, ABORT} cost: float cost/regret/quality (float) (None, if not returned by TA) runtime: float runtime (None if not returned by TA) additional_info: dict all further additional run information """ return self.single_worker.run( config=config, instance=instance, cutoff=cutoff, seed=seed, budget=budget, instance_specific=instance_specific, )
[docs] def num_workers(self) -> int: """Total number of workers available. This number is dynamic as more resources can be allocated """ return sum(self.client.nthreads().values())
def _workers_available(self) -> bool: """ Query if there are workers available, which means that there are resources to launch a dask job. """ total_compute_power = sum(self.client.nthreads().values()) if len(self.futures) < total_compute_power: return True return False
[docs] def __del__(self) -> None: """ Make sure that when this object gets deleted, the client is terminated. This is only done if the client was created by the dask runner. """ if self.close_client_at_del: self.client.close()