Executors
Executors#
The Scheduler uses
an Executor, a builtin python native to
submit(f, *args, **kwargs) to be computed
else where, whether it be locally or remotely.
Some parallelism libraries natively support this interface while we can
wrap others. You can also wrap you own custom backend by using
the Executor interface, which is relatively simple to implement.
If there's any executor background you wish to integrate, we would be happy to consider it and greatly appreciate a PR!
Python#
Python supports the Executor interface natively with the
concurrent.futures module for processes with the
ProcessPoolExecutor and
ThreadPoolExecutor for threads.
Usage
- Explicitly use the
with_processesmethod to create aSchedulerwith aProcessPoolExecutorwith 2 workers.
- Explicitly use the
with_threadsmethod to create aSchedulerwith aThreadPoolExecutorwith 2 workers.
Why to not use threads
Python also defines a ThreadPoolExecutor
but there are some known drawbacks to offloading heavy compute to threads. Notably,
there's no way in python to terminate a thread from the outside while it's running.
dask#
Dask and the supporting extension dask.distributed
provide a robust and flexible framework for scheduling compute across workers.
Example
dask-jobqueue#
dask-jobqueue is a package
for scheduling jobs across common clusters setups such as
PBS, Slurm, MOAB, SGE, LSF, and HTCondor.
Please see the dask-jobqueue documentation
In particular, we only control the parameter n_workers= to
use the adapt()
method, every other keyword is forwarded to the relative
cluster implementation.
In general, you should specify the requirements of each individual worker
and tune your load with the n_workers= parameter.
If you have any tips, tricks, working setups, gotchas, please feel free to leave a PR or simply an issue!
Usage
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_slurm(
n_workers=10, # (1)!
queue=...,
cores=4,
memory="6 GB",
walltime="00:10:00"
)
- The
n_workersparameter is used to set the number of workers to start with. Theadapt()method will be called on the cluster to dynamically scale up ton_workers=based on the load. Thewith_slurmmethod will create aSLURMClusterand pass it to theSchedulerconstructor.
Running outside the login node
If you're running the scheduler itself in a job, this may not work on some cluster setups. The scheduler itself is lightweight and can run on the login node without issue. However you should make sure to offload heavy computations to a worker.
If you get it to work, for example in an interactive job, please let us know!
Please see the dask-jobqueue documentation
and the following methods:
loky#
Loky is the default backend executor behind
joblib, the parallelism that
powers scikit-learn.
Usage
BLAS numeric backend
The loky executor seems to pick up on a different BLAS library (from scipy)
which is different than those used by jobs from something like a ProcessPoolExecutor.
This is likely not to matter for a majority of use-cases.
ray#
Ray is an open-source unified compute framework that makes it easy to scale AI and Python workloads — from reinforcement learning to deep learning to tuning, and model serving.
In progress
Ray is currently in the works of supporting the Python
Executor interface. See this PR
for more info.
airflow#
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. Their list of integrations to platforms is endless but features compute platforms such as Kubernetes, AWS, Microsoft Azure and GCP.
In progress
We plan to support airflow in the future. If you'd like to help
out, please reach out to us!
Debugging#
Sometimes you'll need to debug what's going on and remove the noise
of processes and parallelism. For this, we have implemented a very basic
SequentialExecutor to run everything
in a sequential manner!
Recursion
If you use The SequentialExecutor, be careful that the stack
of function calls can get quite large, quite quick. If you are
using this for debugging, keep the number of submitted tasks
from callbacks small and focus in on debugging. If using this
for sequential ordering of operations, prefer to use
with_processes(1) as this will still maintain order but not
have these stack issues.