Skip to content

Scheduler

The Scheduler uses an Executor, a builtin python native with a submit(f, *args, **kwargs) function to submit compute to be compute else where, whether it be locally or remotely.

The Scheduler is primarily used to dispatch compute to an Executor and emit @events, which can trigger user callbacks.

Typically you should not use the Scheduler directly for dispatching and responding to computed functions, but rather use a Task

Running in a Jupyter Notebook/Colab

If you are using a Jupyter Notebook, you likley need to use the following at the top of your notebook:

import nest_asyncio  # Only necessary in Notebooks
nest_asyncio.apply()

scheduler.run(...)

This is due to the fact a notebook runs in an async context. If you do not wish to use the above snippet, you can instead use:

await scheduler.async_run(...)
Basic Usage

In this example, we create a scheduler that uses local processes as workers. We then create a task that will run a function fn and submit it to the scheduler. Lastly, a callback is registered to @on_future_result to print the result when the compute is done.

from amltk.scheduling import Scheduler

def fn(x: int) -> int:
    return x + 1

scheduler = Scheduler.with_processes(1)

@scheduler.on_start
def launch_the_compute():
    scheduler.submit(fn, 1)

@scheduler.on_future_result
def callback(future, result):
    print(f"Result: {result}")

scheduler.run()

Result: 2

The last line in the previous example called scheduler.run() is what starts the scheduler running, in which it will first emit the @on_start event. This triggered the callback launch_the_compute() which submitted the function fn with the arguments 1.

The scheduler then ran the compute and waited for it to complete, emitting the @on_future_result event when it was done successfully. This triggered the callback callback() which printed the result.

At this point, there is no more compute happening and no more events to respond to so the scheduler will halt.

@events

When the scheduler enters some important state, it will emit an event to let you know.

A Subscriber which is called when the scheduler starts. This is the first event emitted by the scheduler and one of the only ways to submit the initial compute to the scheduler.

@scheduler.on_start
def my_callback():
    ...

A Subscriber which is called when the scheduler is finishing up. This occurs right before the scheduler shuts down the executor.

@scheduler.on_finishing
def my_callback():
    ...

A Subscriber which is called when the scheduler is finished, has shutdown the executor and possibly terminated any remaining compute.

@scheduler.on_finished
def my_callback():
    ...

A Subscriber which is called when the scheduler is has been stopped due to the stop() method being called.

@scheduler.on_stop
def my_callback(stop_msg: str, exception: BaseException | None):
    ...

A Subscriber which is called when the scheduler reaches the timeout.

@scheduler.on_timeout
def my_callback():
    ...

A Subscriber which is called when the queue is empty. This can be useful to re-fill the queue and prevent the scheduler from exiting.

@scheduler.on_empty
def my_callback():
    ...

When any compute goes through the Scheduler, it will emit an event to let you know. You should however prefer to use a Task as it will emit specific events for the task at hand, and not all compute.

A Subscriber which is called when some compute is submitted.

@scheduler.on_future_submitted
def my_callback(future: Future):
    ...

A Subscriber which is called when a future returned with a result, no exception raise.

@scheduler.on_future_result
def my_callback(future: Future, result: Any):
    ...

A Subscriber which is called when some compute raised an uncaught exception.

@scheduler.on_future_exception
def my_callback(future: Future, exception: BaseException):
    ...

A Subscriber which is called when some compute is done, regardless of whether it was successful or not.

@scheduler.on_future_done
def my_callback(future: Future):
    ...

A Subscriber which is called when a future is cancelled. This usually occurs due to the underlying Scheduler, and is not something we do directly, other than when shutting down the scheduler.

@scheduler.on_future_cancelled
def my_callback(future: Future):
    ...
Common usages of run()

There are various ways to run() the scheduler, notably how long it should run with timeout= and also how it should react to any exception that may have occurred within the Scheduler itself or your callbacks.

Please see the run() API doc for more details and features, however we show two common use cases of using the timeout= parameter.

You can render a live display using run(display=...). This require rich to be installed. You can install this with pip install rich or pip install amltk[rich].

You can tell the Scheduler to stop after a certain amount of time with the timeout= argument to run().

This will also trigger the @on_timeout event as seen in the Scheduler output.

import time
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def expensive_function() -> int:
    time.sleep(0.1)
    return 42

@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function)

# The will endlessly loop the scheduler
@scheduler.on_future_done
def submit_again(future: Future) -> None:
    if scheduler.running():
        scheduler.submit(expensive_function)

scheduler.run(timeout=1)  # End after 1 second

By specifying that the Scheduler should not wait for ongoing tasks to finish, the Scheduler will attempt to cancel and possibly terminate any running tasks.

import time
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def expensive_function() -> None:
    time.sleep(10)


@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function)

scheduler.run(timeout=1, wait=False)  # End after 1 second

Forcibly Terminating Workers

As an Executor does not provide an interface to forcibly terminate workers, we provide Scheduler(terminate=...) as a custom strategy for cleaning up a provided executor. It is not possible to terminate running thread based workers, for example using ThreadPoolExecutor and any Executor using threads to spawn tasks will have to wait until all running tasks are finish before python can close.

It's likely terminate will trigger the EXCEPTION event for any tasks that are running during the shutdown, not* a cancelled event. This is because we use a Future under the hood and these can not be cancelled once running. However there is no guarantee of this and is up to how the Executor handles this.

Scheduling something to be run later

You can schedule some function to be run later using the scheduler.call_later() method.

Note

This does not run the function in the background, it just schedules some function to be called later, where you could perhaps then use submit to scheduler a Task to run the function in the background.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def fn() -> int:
    print("Ending now!")
    scheduler.stop()

@scheduler.on_start
def schedule_fn() -> None:
    scheduler.call_later(1, fn)

scheduler.run(end_on_empty=False)
Ending now!

class Scheduler(executor, *, terminate=True) #

Bases: RichRenderable

A scheduler for submitting tasks to an Executor.

PARAMETER DESCRIPTION
executor

The dispatcher to use for submitting tasks.

TYPE: Executor

terminate

Whether to call shutdown on the executor when run(..., wait=False). If True, the executor will be shutdown(wait=False) and we will attempt to terminate any workers of the executor. For some Executors this is enough, i.e. Dask, however for something like ProcessPoolExecutor, we will use psutil to kill its worker processes. If a callable, we will use this function for custom worker termination. If False, shutdown will not be called and the executor will remain active.

TYPE: Callable[[Executor], None] | bool DEFAULT: True

Source code in src/amltk/scheduling/scheduler.py
def __init__(
    self,
    executor: Executor,
    *,
    terminate: Callable[[Executor], None] | bool = True,
) -> None:
    """Initialize a scheduler.

    Args:
        executor: The dispatcher to use for submitting tasks.
        terminate: Whether to call shutdown on the executor when
            `run(..., wait=False)`. If True, the executor will be
            `shutdown(wait=False)` and we will attempt to terminate
            any workers of the executor. For some `Executors` this
            is enough, i.e. Dask, however for something like
            `ProcessPoolExecutor`, we will use `psutil` to kill
            its worker processes. If a callable, we will use this
            function for custom worker termination.
            If False, shutdown will not be called and the executor will
            remain active.
    """
    super().__init__()
    self.executor = executor
    self.unique_ref = f"Scheduler-{uuid4()}"
    self.emitter = Emitter()
    self.event_counts = self.emitter.event_counts

    # The current state of things and references to them
    self.queue = {}

    # Set up subscribers for events
    self.on_start = self.emitter.subscriber(self.STARTED)
    self.on_finishing = self.emitter.subscriber(self.FINISHING)
    self.on_finished = self.emitter.subscriber(self.FINISHED)
    self.on_stop = self.emitter.subscriber(self.STOP)
    self.on_timeout = self.emitter.subscriber(self.TIMEOUT)
    self.on_empty = self.emitter.subscriber(self.EMPTY)

    self.on_future_submitted = self.emitter.subscriber(self.FUTURE_SUBMITTED)
    self.on_future_done = self.emitter.subscriber(self.FUTURE_DONE)
    self.on_future_cancelled = self.emitter.subscriber(self.FUTURE_CANCELLED)
    self.on_future_exception = self.emitter.subscriber(self.FUTURE_EXCEPTION)
    self.on_future_result = self.emitter.subscriber(self.FUTURE_RESULT)

    self._terminate: Callable[[Executor], None] | None
    if terminate is True:
        self._terminate = termination_strategy(executor)
    else:
        self._terminate = terminate if callable(terminate) else None

    # This can be triggered either by `scheduler.stop` in a callback.
    # Has to be created inside the event loop so there's no issues
    self._stop_event: ContextEvent | None = None

    # This is a condition to make sure monitoring the queue will wait properly
    self._queue_has_items_event = asyncio.Event()

    # This is triggered when run is called
    self._running_event = asyncio.Event()

    # This is set once `run` is called
    self._end_on_exception_flag = Flag(initial=False)

    # This is used to manage suequential queues, where we need a Thread
    # timer to ensure that we don't get caught in an endless loop waiting
    # for the `timeout` in `_run_scheduler` to trigger. This won't trigger
    # because the sync code of submit could possibly keep calling itself
    # endlessly, preventing any of the async code from running.
    self._timeout_timer: Timer | None = None

    # A collection of things that want to register as being part of something
    # to render when the Scheduler is rendered.
    self._renderables: list[RenderableType] = [self.emitter]

    # These are extra user provided renderables during a call to `run()`. We
    # seperate these out so that we can remove them when the scheduler is
    # stopped.
    self._extra_renderables: list[RenderableType] | None = None

    # An indicator an object to render live output (if requested) with
    # `display=` on a call to `run()`
    self._live_output: Live | None = None

executor: Executor
attr
#

The executor to use to run tasks.

emitter: Emitter
attr
#

The emitter to use for events.

queue: dict[Future, tuple[Callable, tuple, dict]]
attr
#

The queue of tasks running.

on_start: Subscriber[[]]
attr
#

A Subscriber which is called when the scheduler starts. This is the first event emitted by the scheduler and one of the only ways to submit the initial compute to the scheduler.

@scheduler.on_start
def my_callback():
    ...

on_finishing: Subscriber[[]]
attr
#

A Subscriber which is called when the scheduler is finishing up. This occurs right before the scheduler shuts down the executor.

@scheduler.on_finishing
def my_callback():
    ...

on_finished: Subscriber[[]]
attr
#

A Subscriber which is called when the scheduler is finished, has shutdown the executor and possibly terminated any remaining compute.

@scheduler.on_finished
def my_callback():
    ...

on_stop: Subscriber[str, BaseException | None]
attr
#

A Subscriber which is called when the scheduler is has been stopped due to the stop() method being called.

@scheduler.on_stop
def my_callback(stop_msg: str, exception: BaseException | None):
    ...

on_timeout: Subscriber[[]]
attr
#

A Subscriber which is called when the scheduler reaches the timeout.

@scheduler.on_timeout
def my_callback():
    ...

on_empty: Subscriber[[]]
attr
#

A Subscriber which is called when the queue is empty. This can be useful to re-fill the queue and prevent the scheduler from exiting.

@scheduler.on_empty
def my_callback():
    ...

on_future_submitted: Subscriber[Future]
attr
#

A Subscriber which is called when some compute is submitted.

@scheduler.on_future_submitted
def my_callback(future: Future):
    ...

on_future_done: Subscriber[Future]
attr
#

A Subscriber which is called when some compute is done, regardless of whether it was successful or not.

@scheduler.on_future_done
def my_callback(future: Future):
    ...

on_future_cancelled: Subscriber[Future]
attr
#

A Subscriber which is called when a future is cancelled. This usually occurs due to the underlying Scheduler, and is not something we do directly, other than when shutting down the scheduler.

@scheduler.on_future_cancelled
def my_callback(future: Future):
    ...

on_future_exception: Subscriber[Future, BaseException]
attr
#

A Subscriber which is called when some compute raised an uncaught exception.

@scheduler.on_future_exception
def my_callback(future: Future, exception: BaseException):
    ...

on_future_result: Subscriber[Future, Any]
attr
#

A Subscriber which is called when a future returned with a result, no exception raise.

@scheduler.on_future_result
def my_callback(future: Future, result: Any):
    ...

def with_processes(max_workers=None, mp_context=None, initializer=None, initargs=())
classmethod
#

Create a scheduler with a ProcessPoolExecutor.

See ProcessPoolExecutor for more details.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_processes(
    cls,
    max_workers: int | None = None,
    mp_context: BaseContext | Literal["fork", "spawn", "forkserver"] | None = None,
    initializer: Callable[..., Any] | None = None,
    initargs: tuple[Any, ...] = (),
) -> Self:
    """Create a scheduler with a `ProcessPoolExecutor`.

    See [`ProcessPoolExecutor`][concurrent.futures.ProcessPoolExecutor]
    for more details.
    """
    if isinstance(mp_context, str):
        from multiprocessing import get_context

        mp_context = get_context(mp_context)

    executor = ProcessPoolExecutor(
        max_workers=max_workers,
        mp_context=mp_context,
        initializer=initializer,
        initargs=initargs,
    )
    return cls(executor=executor)

def with_loky(max_workers=None, context=None, timeout=10, kill_workers=False, reuse='auto', job_reducers=None, result_reducers=None, initializer=None, initargs=(), env=None)
classmethod
#

Create a scheduler with a loky.get_reusable_executor.

See [loky documentation][loky.readthedocs.io/en/stable/API.html] for more details.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_loky(  # noqa: PLR0913
    cls,
    max_workers: int | None = None,
    context: BaseContext | Literal["fork", "spawn", "forkserver"] | None = None,
    timeout: int = 10,
    kill_workers: bool = False,  # noqa: FBT002, FBT001
    reuse: bool | Literal["auto"] = "auto",
    job_reducers: Any | None = None,
    result_reducers: Any | None = None,
    initializer: Callable[..., Any] | None = None,
    initargs: tuple[Any, ...] = (),
    env: dict[str, str] | None = None,
) -> Self:
    """Create a scheduler with a `loky.get_reusable_executor`.

    See [loky documentation][https://loky.readthedocs.io/en/stable/API.html]
    for more details.
    """
    from loky import get_reusable_executor

    executor = get_reusable_executor(
        max_workers=max_workers,
        context=context,
        timeout=timeout,
        kill_workers=kill_workers,
        reuse=reuse,  # type: ignore
        job_reducers=job_reducers,
        result_reducers=result_reducers,
        initializer=initializer,
        initargs=initargs,
        env=env,
    )
    return cls(executor=executor)

def with_sequential()
classmethod
#

Create a Scheduler that runs sequentially.

This is useful for debugging and testing. Uses a SequentialExecutor.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_sequential(cls) -> Self:
    """Create a Scheduler that runs sequentially.

    This is useful for debugging and testing. Uses
    a [`SequentialExecutor`][amltk.scheduling.SequentialExecutor].
    """
    return cls(executor=SequentialExecutor())

def with_slurm(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler that runs on a SLURM cluster.

This is useful for running on a SLURM cluster. Uses dask_jobqueue.SLURMCluster.

PARAMETER DESCRIPTION
n_workers

The number of workers to start.

TYPE: int

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

A scheduler that will run on a SLURM cluster.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_slurm(
    cls,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler that runs on a SLURM cluster.

    This is useful for running on a SLURM cluster. Uses
    [dask_jobqueue.SLURMCluster][].

    Args:
        n_workers: The number of workers to start.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Returns:
        A scheduler that will run on a SLURM cluster.
    """
    return cls.with_dask_jobqueue(
        "slurm",
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )

def with_pbs(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler that runs on a PBS cluster.

This is useful for running on a PBS cluster. Uses dask_jobqueue.PBSCluster.

PARAMETER DESCRIPTION
n_workers

The number of workers to start.

TYPE: int

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

A scheduler that will run on a PBS cluster.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_pbs(
    cls,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler that runs on a PBS cluster.

    This is useful for running on a PBS cluster. Uses
    [dask_jobqueue.PBSCluster][].

    Args:
        n_workers: The number of workers to start.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Returns:
        A scheduler that will run on a PBS cluster.
    """
    return cls.with_dask_jobqueue(
        "pbs",
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )

def with_sge(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler that runs on a SGE cluster.

This is useful for running on a SGE cluster. Uses dask_jobqueue.SGECluster.

PARAMETER DESCRIPTION
n_workers

The number of workers to start.

TYPE: int

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

A scheduler that will run on a SGE cluster.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_sge(
    cls,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler that runs on a SGE cluster.

    This is useful for running on a SGE cluster. Uses
    [dask_jobqueue.SGECluster][].

    Args:
        n_workers: The number of workers to start.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Returns:
        A scheduler that will run on a SGE cluster.
    """
    return cls.with_dask_jobqueue(
        "sge",
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )

def with_oar(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler that runs on a OAR cluster.

This is useful for running on a OAR cluster. Uses dask_jobqueue.OARCluster.

PARAMETER DESCRIPTION
n_workers

The number of workers to start.

TYPE: int

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

A scheduler that will run on a OAR cluster.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_oar(
    cls,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler that runs on a OAR cluster.

    This is useful for running on a OAR cluster. Uses
    [dask_jobqueue.OARCluster][].

    Args:
        n_workers: The number of workers to start.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Returns:
        A scheduler that will run on a OAR cluster.
    """
    return cls.with_dask_jobqueue(
        "oar",
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )

def with_moab(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler that runs on a Moab cluster.

This is useful for running on a Moab cluster. Uses dask_jobqueue.MoabCluster.

PARAMETER DESCRIPTION
n_workers

The number of workers to start.

TYPE: int

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

A scheduler that will run on a Moab cluster.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_moab(
    cls,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler that runs on a Moab cluster.

    This is useful for running on a Moab cluster. Uses
    [dask_jobqueue.MoabCluster][].

    Args:
        n_workers: The number of workers to start.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Returns:
        A scheduler that will run on a Moab cluster.
    """
    return cls.with_dask_jobqueue(
        "moab",
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )

def with_lsf(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler that runs on a LSF cluster.

This is useful for running on a LSF cluster. Uses dask_jobqueue.LSFCluster.

PARAMETER DESCRIPTION
n_workers

The number of workers to start.

TYPE: int

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

A scheduler that will run on a LSF cluster.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_lsf(
    cls,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler that runs on a LSF cluster.

    This is useful for running on a LSF cluster. Uses
    [dask_jobqueue.LSFCluster][].

    Args:
        n_workers: The number of workers to start.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Returns:
        A scheduler that will run on a LSF cluster.
    """
    return cls.with_dask_jobqueue(
        "lsf",
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )

def with_htcondor(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler that runs on a HTCondor cluster.

This is useful for running on a HTCondor cluster. Uses dask_jobqueue.HTCondorCluster.

PARAMETER DESCRIPTION
n_workers

The number of workers to start.

TYPE: int

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

A scheduler that will run on a HTCondor cluster.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_htcondor(
    cls,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler that runs on a HTCondor cluster.

    This is useful for running on a HTCondor cluster. Uses
    [dask_jobqueue.HTCondorCluster][].

    Args:
        n_workers: The number of workers to start.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Returns:
        A scheduler that will run on a HTCondor cluster.
    """
    return cls.with_dask_jobqueue(
        "htcondor",
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )

def with_dask_jobqueue(name, *, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#

Create a Scheduler with using dask-jobqueue.

See dask_jobqueue for more details.

PARAMETER DESCRIPTION
name

The name of the jobqueue to use. This is the name of the class in dask_jobqueue to use. For example, to use dask_jobqueue.SLURMCluster, you would use slurm.

TYPE: DJQ_NAMES

adaptive

Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.

TYPE: bool DEFAULT: False

n_workers

The number of workers to start.

TYPE: int

submit_command

Overwrite the command to submit a worker if necessary.

TYPE: str | None DEFAULT: None

cancel_command

Overwrite the command to cancel a worker if necessary.

TYPE: str | None DEFAULT: None

kwargs

Any additional keyword arguments to pass to the dask_jobqueue class.

TYPE: Any DEFAULT: {}

RAISES DESCRIPTION
ImportError

If dask-jobqueue is not installed.

RETURNS DESCRIPTION
Self

A new scheduler with a dask_jobqueue executor.

Source code in src/amltk/scheduling/scheduler.py
@classmethod
def with_dask_jobqueue(
    cls,
    name: DJQ_NAMES,
    *,
    n_workers: int,
    adaptive: bool = False,
    submit_command: str | None = None,
    cancel_command: str | None = None,
    **kwargs: Any,
) -> Self:
    """Create a Scheduler with using `dask-jobqueue`.

    See [`dask_jobqueue`][dask_jobqueue] for more details.

    [dask_jobqueue]: https://jobqueue.dask.org/en/latest/

    Args:
        name: The name of the jobqueue to use. This is the name of the
            class in `dask_jobqueue` to use. For example, to use
            `dask_jobqueue.SLURMCluster`, you would use `slurm`.
        adaptive: Whether to use the adaptive scaling of the cluster or fixed
            allocate all workers. This will specifically use the
            [dask_jobqueue.SLURMCluster.adapt](https://jobqueue.dask.org/en/latest/index.html?highlight=adapt#adaptivity)
            method to dynamically scale the cluster to the number of workers
            specified.
        n_workers: The number of workers to start.
        submit_command: Overwrite the command to submit a worker if necessary.
        cancel_command: Overwrite the command to cancel a worker if necessary.
        kwargs: Any additional keyword arguments to pass to the
            `dask_jobqueue` class.

    Raises:
        ImportError: If `dask-jobqueue` is not installed.

    Returns:
        A new scheduler with a `dask_jobqueue` executor.
    """
    try:
        from amltk.scheduling.executors.dask_jobqueue import DaskJobqueueExecutor

    except ImportError as e:
        raise ImportError(
            f"To use the {name} executor, you must install the "
            "`dask-jobqueue` package.",
        ) from e

    executor = DaskJobqueueExecutor.from_str(
        name,
        n_workers=n_workers,
        adaptive=adaptive,
        submit_command=submit_command,
        cancel_command=cancel_command,
        **kwargs,
    )
    return cls(executor)

def empty() #

Check if the scheduler is empty.

RETURNS DESCRIPTION
bool

True if there are no tasks in the queue.

Source code in src/amltk/scheduling/scheduler.py
def empty(self) -> bool:
    """Check if the scheduler is empty.

    Returns:
        True if there are no tasks in the queue.
    """
    return len(self.queue) == 0

def running() #

Whether the scheduler is running and accepting tasks to dispatch.

RETURNS DESCRIPTION
bool

True if the scheduler is running and accepting tasks.

Source code in src/amltk/scheduling/scheduler.py
def running(self) -> bool:
    """Whether the scheduler is running and accepting tasks to dispatch.

    Returns:
        True if the scheduler is running and accepting tasks.
    """
    return self._running_event.is_set()

def submit(fn, /, *args, **kwargs) #

Submits a callable to be executed with the given arguments.

PARAMETER DESCRIPTION
fn

The callable to be executed as fn(args, *kwargs) that returns a Future instance representing the execution of the callable.

TYPE: Callable[P, R]

args

positional arguments to pass to the function

TYPE: args DEFAULT: ()

kwargs

keyword arguments to pass to the function

TYPE: kwargs DEFAULT: {}

RAISES DESCRIPTION
SchedulerNotRunningError

If the scheduler is not running. You can protect against this using, scheduler.running().

RETURNS DESCRIPTION
Future[R]

A Future representing the given call.

Source code in src/amltk/scheduling/scheduler.py
def submit(
    self,
    fn: Callable[P, R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[R]:
    """Submits a callable to be executed with the given arguments.

    Args:
        fn: The callable to be executed as
            fn(*args, **kwargs) that returns a Future instance representing
            the execution of the callable.
        args: positional arguments to pass to the function
        kwargs: keyword arguments to pass to the function

    Raises:
        SchedulerNotRunningError: If the scheduler is not running.
            You can protect against this using,
            [`scheduler.running()`][amltk.scheduling.scheduler.Scheduler.running].

    Returns:
        A Future representing the given call.
    """
    if not self.running():
        msg = (
            f"Scheduler is not running, cannot submit task {fn}"
            f" with {args=}, {kwargs=}"
        )
        raise SchedulerNotRunningError(msg)

    try:
        sync_future = self.executor.submit(fn, *args, **kwargs)
        future = asyncio.wrap_future(sync_future)
    except Exception as e:
        logger.exception(f"Could not submit task {fn}", exc_info=e)
        raise e

    self._register_future(future, fn, *args, **kwargs)
    return future

def task(function, *, plugins=(), init_plugins=True) #

Create a new task.

PARAMETER DESCRIPTION
function

The function to run using the scheduler.

TYPE: Callable[P, R] | Callable[Concatenate[Comm, P], R]

plugins

The plugins to attach to the task.

TYPE: Plugin | Iterable[Plugin] DEFAULT: ()

init_plugins

Whether to initialize the plugins.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Task[P, R]

A new task.

Source code in src/amltk/scheduling/scheduler.py
def task(
    self,
    function: Callable[P, R] | Callable[Concatenate[Comm, P], R],
    *,
    plugins: Plugin | Iterable[Plugin] = (),
    init_plugins: bool = True,
) -> Task[P, R]:
    """Create a new task.

    Args:
        function: The function to run using the scheduler.
        plugins: The plugins to attach to the task.
        init_plugins: Whether to initialize the plugins.

    Returns:
        A new task.
    """
    # HACK: Not that the type: ignore is due to the fact that we can't use type
    # checking to enforce that
    # A. `function` is a callable with the first arg being a Comm
    # B. `plugins`
    task = Task(function, self, plugins=plugins, init_plugins=init_plugins)  # type: ignore
    self.add_renderable(task)
    return task  # type: ignore

def run(*, timeout=None, end_on_empty=True, wait=True, on_exception='raise', asyncio_debug_mode=False, display=False) #

Run the scheduler.

PARAMETER DESCRIPTION
timeout

The maximum time to run the scheduler for in seconds. Defaults to None which means no timeout and it will end once the queue is empty if end_on_empty=True.

TYPE: float | None DEFAULT: None

end_on_empty

Whether to end the scheduler when the queue becomes empty.

TYPE: bool DEFAULT: True

wait

Whether to wait for currently running compute to finish once the Scheduler is shutting down.

  • If True, will wait for all currently running compute.
  • If False, will attempt to cancel/terminate all currently running compute and shutdown the executor. This may be useful if you want to end the scheduler as quickly as possible or respect the timeout= more precisely.

TYPE: bool DEFAULT: True

on_exception

What to do when an exception occurs in the scheduler or callbacks (Does not apply to submitted compute!)

  • If "raise", the scheduler will stop and raise the exception at the point where you called run().
  • If "ignore", the scheduler will continue running, ignoring the exception. This may be useful when requiring more robust execution.
  • If "end", similar to "raise", the scheduler will stop but no exception will occur and the control flow will return gracefully to the point where you called run().

TYPE: Literal['raise', 'end', 'ignore'] DEFAULT: 'raise'

asyncio_debug_mode

Whether to run the async loop in debug mode. Defaults to False. Please see asyncio.run for more.

TYPE: bool DEFAULT: False

display

Whether to display the scheduler live in the console.

  • If True, will display the scheduler and all its tasks.
  • If a list[RenderableType] , will display the scheduler itself plus those renderables.

TYPE: bool | list[RenderableType] DEFAULT: False

RETURNS DESCRIPTION
ExitState

The reason for the scheduler ending.

RAISES DESCRIPTION
RuntimeError

If the scheduler is already running.

Source code in src/amltk/scheduling/scheduler.py
def run(
    self,
    *,
    timeout: float | None = None,
    end_on_empty: bool = True,
    wait: bool = True,
    on_exception: Literal["raise", "end", "ignore"] = "raise",
    asyncio_debug_mode: bool = False,
    display: bool | list[RenderableType] = False,
) -> ExitState:
    """Run the scheduler.

    Args:
        timeout: The maximum time to run the scheduler for in
            seconds. Defaults to `None` which means no timeout and it
            will end once the queue is empty if `end_on_empty=True`.
        end_on_empty: Whether to end the scheduler when the queue becomes empty.
        wait: Whether to wait for currently running compute to finish once
            the `Scheduler` is shutting down.

            * If `#!python True`, will wait for all currently running compute.
            * If `#!python False`, will attempt to cancel/terminate all currently
                running compute and shutdown the executor. This may be useful
                if you want to end the scheduler as quickly as possible or
                respect the `timeout=` more precisely.
        on_exception: What to do when an exception occurs in the scheduler
            or callbacks (**Does not apply to submitted compute!**)

            * If `#!python "raise"`, the scheduler will stop and raise the
                exception at the point where you called `run()`.
            * If `#!python "ignore"`, the scheduler will continue running,
                ignoring the exception. This may be useful when requiring more
                robust execution.
            * If `#!python "end"`, similar to `#!python "raise"`, the scheduler
                will stop but no exception will occur and the control flow
                will return gracefully to the point where you called `run()`.
        asyncio_debug_mode: Whether to run the async loop in debug mode.
            Defaults to `False`. Please see [asyncio.run][] for more.
        display: Whether to display the scheduler live in the console.

            * If `#!python True`, will display the scheduler and all its tasks.
            * If a `#!python list[RenderableType]` , will display the scheduler
                itself plus those renderables.

    Returns:
        The reason for the scheduler ending.

    Raises:
        RuntimeError: If the scheduler is already running.
    """
    return asyncio.run(
        self.async_run(
            timeout=timeout,
            end_on_empty=end_on_empty,
            wait=wait,
            on_exception=on_exception,
            display=display,
        ),
        debug=asyncio_debug_mode,
    )

def async_run(*, timeout=None, end_on_empty=True, wait=True, on_exception='raise', display=False)
async
#

Async version of run.

This can be useful if you are already running in an async context, such as in a web server or Jupyter notebook.

Please see run() for more details.

Source code in src/amltk/scheduling/scheduler.py
async def async_run(
    self,
    *,
    timeout: float | None = None,
    end_on_empty: bool = True,
    wait: bool = True,
    on_exception: Literal["raise", "end", "ignore"] = "raise",
    display: bool | list[RenderableType] = False,
) -> ExitState:
    """Async version of `run`.

    This can be useful if you are already running in an async context,
    such as in a web server or Jupyter notebook.

    Please see [`run()`][amltk.Scheduler.run] for more details.
    """
    if self.running():
        raise RuntimeError("Scheduler already seems to be running")

    logger.debug("Starting scheduler")

    # Make sure flags are set
    self._end_on_exception_flag.set(value=on_exception in ("raise", "end"))

    # If the user has requested to have a live display,
    # we will need to setup a `Live` instance to render to
    if display:
        from rich.live import Live

        if isinstance(display, list):
            self._extra_renderables = display

        self._live_output = Live(
            auto_refresh=False,
            get_renderable=self.__rich__,
        )

    loop = asyncio.get_running_loop()

    # Set the exception handler for asyncio
    previous_exception_handler = None
    if on_exception in ("raise", "end"):
        previous_exception_handler = loop.get_exception_handler()

        def custom_exception_handler(
            loop: asyncio.AbstractEventLoop,
            context: dict[str, Any],
        ) -> None:
            exception = context.get("exception")
            message = context.get("message")

            # handle with previous handler
            if previous_exception_handler:
                previous_exception_handler(loop, context)
            else:
                loop.default_exception_handler(context)

            self.stop(stop_msg=message, exception=exception)

        loop.set_exception_handler(custom_exception_handler)

    # Run the actual scheduling loop
    result = await self._run_scheduler(
        timeout=timeout,
        end_on_empty=end_on_empty,
        wait=wait,
    )

    # Reset variables back to its default
    self._live_output = None
    self._extra_renderables = None
    self._end_on_exception_flag.reset()

    if previous_exception_handler is not None:
        loop.set_exception_handler(previous_exception_handler)

    # If we were meant to end on an exception and the result
    # we got back from the scheduler was an exception, raise it
    if isinstance(result, BaseException):
        if on_exception == "raise":
            raise result

        return ExitState(code=ExitState.Code.EXCEPTION, exception=result)

    return ExitState(code=result)

def stop(*args, stop_msg=None, exception=None, **kwargs) #

Stop the scheduler.

The scheduler will stop, finishing currently running tasks depending on the wait= parameter to Scheduler.run.

The call signature is kept open with *args, **kwargs to make it easier to include in any callback.

PARAMETER DESCRIPTION
*args

Logged in a debug message

TYPE: Any DEFAULT: ()

**kwargs

Logged in a debug message

TYPE: Any DEFAULT: {}

stop_msg

The message to log when stopping the scheduler.

TYPE: str | None DEFAULT: None

exception

The exception which incited stop() to be called. Will be used by the Scheduler to possibly raise the exception to the user.

TYPE: BaseException | None DEFAULT: None

Source code in src/amltk/scheduling/scheduler.py
def stop(
    self,
    *args: Any,
    stop_msg: str | None = None,
    exception: BaseException | None = None,
    **kwargs: Any,
) -> None:
    """Stop the scheduler.

    The scheduler will stop, finishing currently running tasks depending
    on the `wait=` parameter to [`Scheduler.run`][amltk.Scheduler.run].

    The call signature is kept open with `*args, **kwargs` to make it
    easier to include in any callback.

    Args:
        *args: Logged in a debug message
        **kwargs: Logged in a debug message
        stop_msg: The message to log when stopping the scheduler.
        exception: The exception which incited `stop()` to be called.
            Will be used by the `Scheduler` to possibly raise the exception
            to the user.
    """
    if not self.running():
        return

    assert self._stop_event is not None

    msg = stop_msg if stop_msg is not None else "scheduler.stop() was called."
    logger.debug(f"Stopping scheduler: {msg} {args=} {kwargs=}")

    self._stop_event.set(msg=msg, exception=exception)
    self._running_event.clear()

def call_later(delay, fn, *args, **kwargs) #

Schedule a function to be run after a delay.

PARAMETER DESCRIPTION
delay

The delay in seconds.

TYPE: float

fn

The function to run.

TYPE: Callable[P, Any]

args

The positional arguments to pass to the function.

TYPE: args DEFAULT: ()

kwargs

The keyword arguments to pass to the function.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
TimerHandle

A timer handle that can be used to cancel the function.

Source code in src/amltk/scheduling/scheduler.py
def call_later(
    self,
    delay: float,
    fn: Callable[P, Any],
    *args: P.args,
    **kwargs: P.kwargs,
) -> asyncio.TimerHandle:
    """Schedule a function to be run after a delay.

    Args:
        delay: The delay in seconds.
        fn: The function to run.
        args: The positional arguments to pass to the function.
        kwargs: The keyword arguments to pass to the function.

    Returns:
        A timer handle that can be used to cancel the function.
    """
    if not self.running():
        raise RuntimeError("Scheduler is not running!")

    _fn = partial(fn, *args, **kwargs)
    loop = asyncio.get_running_loop()
    return loop.call_later(delay, _fn)

def add_renderable(renderable) #

Add a renderable object to the scheduler.

This will be displayed whenever the scheduler is displayed.

Source code in src/amltk/scheduling/scheduler.py
def add_renderable(self, renderable: RenderableType) -> None:
    """Add a renderable object to the scheduler.

    This will be displayed whenever the scheduler is displayed.
    """
    self._renderables.append(renderable)

class ExitState
dataclass
#

The exit state of a scheduler.

ATTRIBUTE DESCRIPTION
reason

The reason for the exit.

exception

The exception that caused the exit, if any.

TYPE: BaseException | None

class Code #

Bases: Enum

The reason the scheduler ended.

STOPPED
classvar attr
#

The scheduler was stopped forcefully with Scheduler.stop.

TIMEOUT
classvar attr
#

The scheduler finished because of a timeout.

EXHAUSTED
classvar attr
#

The scheduler finished because it exhausted its queue.

CANCELLED
classvar attr
#

The scheduler was cancelled.

UNKNOWN
classvar attr
#

The scheduler finished for an unknown reason.

EXCEPTION
classvar attr
#

The scheduler finished because of an exception.