Skip to content

Executors

Executors#

The Scheduler uses an Executor, a builtin python native to submit(f, *args, **kwargs) to be computed else where, whether it be locally or remotely.

from amltk.scheduling import Scheduler

scheduler = Scheduler(executor=...)

Some parallelism libraries natively support this interface while we can wrap others. You can also wrap you own custom backend by using the Executor interface, which is relatively simple to implement.

If there's any executor background you wish to integrate, we would be happy to consider it and greatly appreciate a PR!

Python#

Python supports the Executor interface natively with the concurrent.futures module for processes with the ProcessPoolExecutor and ThreadPoolExecutor for threads.

Usage
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(2)  # (1)!
  1. Explicitly use the with_processes method to create a Scheduler with a ProcessPoolExecutor with 2 workers.
     from concurrent.futures import ProcessPoolExecutor
     from amltk.scheduling import Scheduler
    
     executor = ProcessPoolExecutor(max_workers=2)
     scheduler = Scheduler(executor=executor)
    
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_threads(2)  # (1)!
  1. Explicitly use the with_threads method to create a Scheduler with a ThreadPoolExecutor with 2 workers.
     from concurrent.futures import ThreadPoolExecutor
     from amltk.scheduling import Scheduler
    
     executor = ThreadPoolExecutor(max_workers=2)
     scheduler = Scheduler(executor=executor)
    

Why to not use threads

Python also defines a ThreadPoolExecutor but there are some known drawbacks to offloading heavy compute to threads. Notably, there's no way in python to terminate a thread from the outside while it's running.

dask#

Dask and the supporting extension dask.distributed provide a robust and flexible framework for scheduling compute across workers.

Example

from dask.distributed import Client
from amltk.scheduling import Scheduler

client = Client(...)
executor = client.get_executor()
scheduler = Scheduler(executor=executor)

# Important to do if the program will continue!
client.close()

dask-jobqueue#

dask-jobqueue is a package for scheduling jobs across common clusters setups such as PBS, Slurm, MOAB, SGE, LSF, and HTCondor.

Please see the dask-jobqueue documentation In particular, we only control the parameter n_workers= and use adaptive= to control where to use adapt() or scale() method, every other keyword is forwarded to the relative cluster implementation.

In general, you should specify the requirements of each individual worker and tune your load with the n_workers= parameter.

If you have any tips, tricks, working setups, gotchas, please feel free to leave a PR or simply an issue!

Usage
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_slurm(
    n_workers=10,  # (1)!
    adaptive=True,
    queue=...,
    cores=4,
    memory="6 GB",
    walltime="00:10:00"
)
  1. The n_workers parameter is used to set the number of workers to start with. The adapt() method will be called on the cluster to dynamically scale up to n_workers= based on the load. The with_slurm method will create a SLURMCluster and pass it to the Scheduler constructor.
    from dask_jobqueue import SLURMCluster
    from amltk.scheduling import Scheduler
    
    cluster = SLURMCluster(
        queue=...,
        cores=4,
        memory="6 GB",
        walltime="00:10:00"
    )
    cluster.adapt(max_workers=10)
    executor = cluster.get_client().get_executor()
    scheduler = Scheduler(executor=executor)
    

Running outside the login node

If you're running the scheduler itself in a job, this may not work on some cluster setups. The scheduler itself is lightweight and can run on the login node without issue. However you should make sure to offload heavy computations to a worker.

If you get it to work, for example in an interactive job, please let us know!

Modifying the launch command

On some cluster commands, you'll need to modify the launch command. You can use the following to do so:

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_slurm(n_workers=..., submit_command="sbatch --extra"

loky#

Loky is the default backend executor behind joblib, the parallelism that powers scikit-learn.

Usage
from amltk import Scheduler

# Pass any arguments you would pass to `loky.get_reusable_executor`
scheduler = Scheduler.with_loky(...)
import loky
from amltk import Scheduler

scheduler = Scheduler(executor=loky.get_reusable_executor(...))
BLAS numeric backend

The loky executor seems to pick up on a different BLAS library (from scipy) which is different than those used by jobs from something like a ProcessPoolExecutor.

This is likely not to matter for a majority of use-cases.

ray#

Ray is an open-source unified compute framework that makes it easy to scale AI and Python workloads — from reinforcement learning to deep learning to tuning, and model serving.

In progress

Ray is currently in the works of supporting the Python Executor interface. See this PR for more info.

airflow#

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. Their list of integrations to platforms is endless but features compute platforms such as Kubernetes, AWS, Microsoft Azure and GCP.

In progress

We plan to support airflow in the future. If you'd like to help out, please reach out to us!

Debugging#

Sometimes you'll need to debug what's going on and remove the noise of processes and parallelism. For this, we have implemented a very basic SequentialExecutor to run everything in a sequential manner!

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_sequential()
from amltk.scheduling import Scheduler, SequetialExecutor

scheduler = Scheduler(executor=SequentialExecutor())

Recursion

If you use The SequentialExecutor, be careful that the stack of function calls can get quite large, quite quick. If you are using this for debugging, keep the number of submitted tasks from callbacks small and focus in on debugging. If using this for sequential ordering of operations, prefer to use with_processes(1) as this will still maintain order but not have these stack issues.