Skip to content

Scheduler

Scheduler#

The Scheduler uses an Executor, a builtin python native with a submit(f, *args, **kwargs) function to submit compute to be compute else where, whether it be locally or remotely.

The Scheduler is primarily used to dispatch compute to an Executor and emit @events, which can trigger user callbacks.

Typically you should not use the Scheduler directly for dispatching and responding to computed functions, but rather use a Task

Running in a Jupyter Notebook/Colab

If you are using a Jupyter Notebook, you likley need to use the following at the top of your notebook:

import nest_asyncio  # Only necessary in Notebooks
nest_asyncio.apply()

scheduler.run(...)

This is due to the fact a notebook runs in an async context. If you do not wish to use the above snippet, you can instead use:

await scheduler.async_run(...)
Basic Usage

In this example, we create a scheduler that uses local processes as workers. We then create a task that will run a function fn and submit it to the scheduler. Lastly, a callback is registered to @future-result to print the result when the compute is done.

from amltk.scheduling import Scheduler

def fn(x: int) -> int:
    return x + 1

scheduler = Scheduler.with_processes(1)

@scheduler.on_start
def launch_the_compute():
    scheduler.submit(fn, 1)

@scheduler.on_future_result
def callback(future, result):
    print(f"Result: {result}")

scheduler.run()
Result: 2
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    └── def launch_the_compute() (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout
    @on_future_submitted 1
    @on_future_done 1
    @on_future_cancelled
    @on_future_exception
    @on_future_result 1
    └── def callback(future, result) (1)

The last line in the previous example called scheduler.run() is what starts the scheduler running, in which it will first emit the @start event. This triggered the callback launch_the_compute() which submitted the function fn with the arguments 1.

The scheduler then ran the compute and waited for it to complete, emitting the @future-result event when it was done successfully. This triggered the callback callback() which printed the result.

At this point, there is no more compute happening and no more events to respond to so the scheduler will halt.

@events

When the scheduler enters some important state, it will emit an event to let you know.

A Subscriber which is called when the scheduler starts. This is the first event emitted by the scheduler and one of the only ways to submit the initial compute to the scheduler.

@scheduler.on_start
def my_callback():
    ...

A Subscriber which is called when the scheduler is finishing up. This occurs right before the scheduler shuts down the executor.

@scheduler.on_finishing
def my_callback():
    ...

A Subscriber which is called when the scheduler is finished, has shutdown the executor and possibly terminated any remaining compute.

@scheduler.on_finished
def my_callback():
    ...

A Subscriber which is called when the scheduler is has been stopped due to the stop() method being called.

@scheduler.on_stop
def my_callback(stop_msg: str, exception: BaseException | None):
    ...

A Subscriber which is called when the scheduler reaches the timeout.

@scheduler.on_timeout
def my_callback():
    ...

A Subscriber which is called when the queue is empty. This can be useful to re-fill the queue and prevent the scheduler from exiting.

@scheduler.on_empty
def my_callback():
    ...

When any compute goes through the Scheduler, it will emit an event to let you know. You should however prefer to use a Task as it will emit specific events for the task at hand, and not all compute.

A Subscriber which is called when some compute is submitted.

@scheduler.on_future_submitted
def my_callback(future: Future):
    ...

A Subscriber which is called when a future returned with a result, no exception raise.

@scheduler.on_future_result
def my_callback(future: Future, result: Any):
    ...

A Subscriber which is called when some compute raised an uncaught exception.

@scheduler.on_future_exception
def my_callback(future: Future, exception: BaseException):
    ...

A Subscriber which is called when some compute is done, regardless of whether it was successful or not.

@scheduler.on_future_done
def my_callback(future: Future):
    ...

A Subscriber which is called when a future is cancelled. This usually occurs due to the underlying Scheduler, and is not something we do directly, other than when shutting down the scheduler.

@scheduler.on_future_cancelled
def my_callback(future: Future):
    ...
Common usages of run()

There are various ways to run() the scheduler, notably how long it should run with timeout= and also how it should react to any exception that may have occurred within the Scheduler itself or your callbacks.

Please see the run() API doc for more details and features, however we show two common use cases of using the timeout= parameter.

You can render a live display using run(display=...). This require rich to be installed. You can install this with pip install rich or pip install amltk[rich].

You can tell the Scheduler to stop after a certain amount of time with the timeout= argument to run().

This will also trigger the @timeout event as seen in the Scheduler output.

import time
from asyncio import Future

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def expensive_function() -> int:
    time.sleep(0.1)
    return 42

@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function)

# This will endlessly loop the scheduler
@scheduler.on_future_done
def submit_again(future: Future) -> None:
    if scheduler.running():
        scheduler.submit(expensive_function)

scheduler.run(timeout=1)  # End after 1 second
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
    @on_start 1
    └── def submit_calculations() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout 1
    @on_future_submitted 10
    @on_future_done 10
    └── def submit_again(future: _asyncio.Future) -> None (10)
    @on_future_cancelled
    @on_future_exception
    @on_future_result 10

By specifying that the Scheduler should not wait for ongoing tasks to finish, the Scheduler will attempt to cancel and possibly terminate any running tasks.

import time
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def expensive_function() -> None:
    time.sleep(10)


@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function)

scheduler.run(timeout=1, wait=False)  # End after 1 second
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
    @on_start 1
    └── def submit_calculations() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout 1
    @on_future_submitted 1
    @on_future_done
    @on_future_cancelled 1
    @on_future_exception
    @on_future_result

Forcibly Terminating Workers

As an Executor does not provide an interface to forcibly terminate workers, we provide Scheduler(terminate=...) as a custom strategy for cleaning up a provided executor. It is not possible to terminate running thread based workers, for example using ThreadPoolExecutor and any Executor using threads to spawn tasks will have to wait until all running tasks are finish before python can close.

It's likely terminate will trigger the EXCEPTION event for any tasks that are running during the shutdown, not* a cancelled event. This is because we use a Future under the hood and these can not be cancelled once running. However there is no guarantee of this and is up to how the Executor handles this.

Scheduling something to be run later

You can schedule some function to be run later using the scheduler.call_later() method.

Note

This does not run the function in the background, it just schedules some function to be called later, where you could perhaps then use submit to scheduler a Task to run the function in the background.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def fn() -> int:
    print("Ending now!")
    scheduler.stop()

@scheduler.on_start
def schedule_fn() -> None:
    scheduler.call_later(1, fn)

scheduler.run(end_on_empty=False)
Ending now!