Executors
Executors#
The Scheduler
uses
an Executor
, a builtin python native to
submit(f, *args, **kwargs)
to be computed
else where, whether it be locally or remotely.
Some parallelism libraries natively support this interface while we can
wrap others. You can also wrap you own custom backend by using
the Executor
interface, which is relatively simple to implement.
If there's any executor background you wish to integrate, we would be happy to consider it and greatly appreciate a PR!
Python
#
Python supports the Executor
interface natively with the
concurrent.futures
module for processes with the
ProcessPoolExecutor
and
ThreadPoolExecutor
for threads.
Usage
- Explicitly use the
with_processes
method to create aScheduler
with aProcessPoolExecutor
with 2 workers.
- Explicitly use the
with_threads
method to create aScheduler
with aThreadPoolExecutor
with 2 workers.
Why to not use threads
Python also defines a ThreadPoolExecutor
but there are some known drawbacks to offloading heavy compute to threads. Notably,
there's no way in python to terminate a thread from the outside while it's running.
dask
#
Dask and the supporting extension dask.distributed
provide a robust and flexible framework for scheduling compute across workers.
Example
dask-jobqueue
#
dask-jobqueue
is a package
for scheduling jobs across common clusters setups such as
PBS, Slurm, MOAB, SGE, LSF, and HTCondor.
Please see the dask-jobqueue
documentation
In particular, we only control the parameter n_workers=
and use adaptive=
to control where to use adapt()
or scale()
method, every other keyword is forwarded to the relative
cluster implementation.
In general, you should specify the requirements of each individual worker
and tune your load with the n_workers=
parameter.
If you have any tips, tricks, working setups, gotchas, please feel free to leave a PR or simply an issue!
Usage
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_slurm(
n_workers=10, # (1)!
adaptive=True,
queue=...,
cores=4,
memory="6 GB",
walltime="00:10:00"
)
- The
n_workers
parameter is used to set the number of workers to start with. Theadapt()
method will be called on the cluster to dynamically scale up ton_workers=
based on the load. Thewith_slurm
method will create aSLURMCluster
and pass it to theScheduler
constructor.
Running outside the login node
If you're running the scheduler itself in a job, this may not work on some cluster setups. The scheduler itself is lightweight and can run on the login node without issue. However you should make sure to offload heavy computations to a worker.
If you get it to work, for example in an interactive job, please let us know!
Please see the dask-jobqueue
documentation
and the following methods:
loky
#
Loky is the default backend executor behind
joblib
, the parallelism that
powers scikit-learn.
Usage
BLAS numeric backend
The loky executor seems to pick up on a different BLAS library (from scipy)
which is different than those used by jobs from something like a ProcessPoolExecutor
.
This is likely not to matter for a majority of use-cases.
ray
#
Ray is an open-source unified compute framework that makes it easy to scale AI and Python workloads — from reinforcement learning to deep learning to tuning, and model serving.
In progress
Ray is currently in the works of supporting the Python
Executor
interface. See this PR
for more info.
airflow
#
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. Their list of integrations to platforms is endless but features compute platforms such as Kubernetes, AWS, Microsoft Azure and GCP.
In progress
We plan to support airflow
in the future. If you'd like to help
out, please reach out to us!
Debugging#
Sometimes you'll need to debug what's going on and remove the noise
of processes and parallelism. For this, we have implemented a very basic
SequentialExecutor
to run everything
in a sequential manner!
Recursion
If you use The SequentialExecutor
, be careful that the stack
of function calls can get quite large, quite quick. If you are
using this for debugging, keep the number of submitted tasks
from callbacks small and focus in on debugging. If using this
for sequential ordering of operations, prefer to use
with_processes(1)
as this will still maintain order but not
have these stack issues.