Skip to content

Scheduling

AutoML-toolkit was designed to make offloading computation away from the main process easy, to foster increased ability for interact-ability, deployment and control. At the same time, we wanted to have an event based system to manage the complexity that comes with AutoML systems, all while making the API intuitive and extensible.

By the end of this guide, we hope that the following code, its options and its inner working become easy to understand.

Scheduler
from amltk import Scheduler

# Some function to offload to compute
def collatz(n: int) -> int:
    is_even = (n % 2 == 0)
    return int(n / 2) if is_even else int(3 * n + 1)

# Setup the scheduler and create a "task"
scheduler = Scheduler.with_processes(1)
task = scheduler.task(collatz)

answers = []

# Tell the scheduler what to do when it starts
@scheduler.on_start
def start_computing() -> None:
    answers.append(12)
    task.submit(12)  # Launch the task with the argument 12

# Tell the scheduler what to do when the task returns
@task.on_result
def compute_next(_, next_n: int) -> None:
    answers.append(next_n)
    if scheduler.running() and next_n != 1:
        task.submit(next_n)

# Run the scheduler
scheduler.run(timeout=1)  # One second timeout
print(answers)

[12, 6, 3, 10, 5, 16, 8, 4, 2, 1]

╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
@on_start 1
└── def start_computing() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 9
@on_future_done 9
@on_future_cancelled
@on_future_exception
@on_future_result 9
┗━━ ╭─ Task collatz(n: int) -> int ────────────────────────────────────────────╮
    ╰─────────────────────── Ref: Task-collatz-HWl6vJIn ───────────────────────╯

We start by introducing the engine, the Scheduler and how this interacts with python's built-in Executor interface to offload compute to processes, cluster nodes, or even cloud resources.

However, the Scheduler is rather useless without some fuel. For this, we present Tasks, the computational task to perform with the Scheduler and start the system's gears turning.

rich printing

To get the same output locally (terminal or Notebook), you can either call thing.__rich()__, use from rich import print; print(thing) or in a Notebook, simply leave it as the last object of a cell.

You'll have to install with amltk[jupyter] or pip install rich[jupyter] manually.

Scheduler#

The core engine of the AutoML-Toolkit is the Scheduler. Its purpose is to allow you to create workflows in an event driven manner. It does this by allowing you to submit() functions with arguments to be computed in the background, while the main process can continue to do other work. Once this computation has completed, you can react with various callbacks, most likely to submit more computations.

Sounds like asyncio?

If you're familiar with pythons await/async syntax, then this description might sound similar. The Scheduler is powered by an asynchronous event loop but hides this complexity in its API. We do have an asynchronous API which we will discuss later.

Backend#

The first thing to do is define where this computation should happen. A Scheduler builds upon an Executor, an interface provided by python's concurrent.futures module. This interface is used to abstract away the details of how the computation is actually performed. This allows us to easily switch between different backends, such as threads, processes, clusters, cloud resources, or even custom backends.

Available Executors

You can find a list of these in our executor reference.

The simplest one is a ProcessPoolExecutor, which will create a pool of processes to run the compute in parallel. We provide a convenience function for this as Scheduler.with_processes().

from concurrent.futures import ProcessPoolExecutor
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(2)
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 2}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_start
    @on_finishing
    @on_finished
    @on_stop
    @on_timeout
    @on_empty
    @on_future_submitted
    @on_future_done
    @on_future_cancelled
    @on_future_exception
    @on_future_result

Running the Scheduler#

You may have noticed from the above example that there are many events the scheduler will emit, such as @start or @future-done. One particularly important one is @start, an event to signal the scheduler has started and is ready to accept tasks.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

@scheduler.on_start
def print_hello() -> None:
    print("hello")
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_start
    └── def print_hello() -> None
    @on_finishing
    @on_finished
    @on_stop
    @on_timeout
    @on_empty
    @on_future_submitted
    @on_future_done
    @on_future_cancelled
    @on_future_exception
    @on_future_result

From the output, we can see that the print_hello() function was registered to the event @start, but it was never called and no "hello" was printed.

For this to happen, we actually have to run() the scheduler.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

@scheduler.on_start
def print_hello() -> None:
    print("hello")

scheduler.run()
hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    └── def print_hello() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout
    @on_future_submitted
    @on_future_done
    @on_future_cancelled
    @on_future_exception
    @on_future_result

Now the output will show a little yellow number next to the @start and the print_hello(), indicating that event was triggered and the callback was called.

You can subscribe multiple callbacks to the same event and they will each be called in the order they were registered.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

@scheduler.on_start
def print_hello_1() -> None:
    print("hello 1")

def print_hello_2() -> None:
    print("hello 2")

scheduler.on_start(print_hello_2)  # You can also register without a decorator

scheduler.run()
hello 1 hello 2
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    ├── def print_hello_1() -> None (1)
    └── def print_hello_2() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout
    @on_future_submitted
    @on_future_done
    @on_future_cancelled
    @on_future_exception
    @on_future_result

Determinism

It's worth noting that even though we are using an event based system, we are still guaranteed deterministic execution of the callbacks for any given event. The source of indeterminism is the order in which events are emitted, this is determined entirely by your compute functions themselves.

Submitting Compute#

The Scheduler exposes a simple submit() method which allows you to submit compute to be performed while the scheduler is running.

While we will later visit the Task class for defining these units of compute, it is beneficial to see how the Scheduler operates directly with submit(), without abstractions.

In the below example, we will use the @future-result event to submit more compute once the previous computation has returned a result.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(2)

def expensive_function(x: int) -> int:
    return 2 ** x

@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function, 2)  # Submit compute

# Called when the submitted function is done
@scheduler.on_future_result
def print_result(_, result: int) -> None:
    print(result)
    if result < 10:
        scheduler.submit(expensive_function, result)

scheduler.run()
4 16
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 2}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    └── def submit_calculations() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout
    @on_future_submitted 2
    @on_future_done 2
    @on_future_cancelled
    @on_future_exception
    @on_future_result 2
    └── def print_result(_, result: int) -> None (2)

What's a Future?

A Future is a special object which represents the result of an asynchronous computation. It's an object that can be queried for its result/exception of some computation which may not have completed yet.

Scheduler Events#

Here are some of the possible @events a Scheduler can emit, but please visit the scheduler reference for a complete list.

@events

amltk.scheduling.Scheduler.on_start instance-attribute #

on_start: Subscriber[[], Any] = subscriber(STARTED)

A Subscriber which is called when the scheduler starts. This is the first event emitted by the scheduler and one of the only ways to submit the initial compute to the scheduler.

@scheduler.on_start
def my_callback():
    ...

amltk.scheduling.Scheduler.on_future_result instance-attribute #

on_future_result: Subscriber[[Future, Any], Any] = (
    subscriber(FUTURE_RESULT)
)

A Subscriber which is called when a future returned with a result, no exception raise.

@scheduler.on_future_result
def my_callback(future: Future, result: Any):
    ...

amltk.scheduling.Scheduler.on_future_exception instance-attribute #

on_future_exception: Subscriber[
    [Future, BaseException], Any
] = subscriber(FUTURE_EXCEPTION)

A Subscriber which is called when some compute raised an uncaught exception.

@scheduler.on_future_exception
def my_callback(future: Future, exception: BaseException):
    ...

amltk.scheduling.Scheduler.on_future_submitted instance-attribute #

on_future_submitted: Subscriber[[Future], Any] = subscriber(
    FUTURE_SUBMITTED
)

A Subscriber which is called when some compute is submitted.

@scheduler.on_future_submitted
def my_callback(future: Future):
    ...

amltk.scheduling.Scheduler.on_future_done instance-attribute #

on_future_done: Subscriber[[Future], Any] = subscriber(
    FUTURE_DONE
)

A Subscriber which is called when some compute is done, regardless of whether it was successful or not.

@scheduler.on_future_done
def my_callback(future: Future):
    ...

amltk.scheduling.Scheduler.on_future_cancelled instance-attribute #

on_future_cancelled: Subscriber[[Future], Any] = subscriber(
    FUTURE_CANCELLED
)

A Subscriber which is called when a future is cancelled. This usually occurs due to the underlying Scheduler, and is not something we do directly, other than when shutting down the scheduler.

@scheduler.on_future_cancelled
def my_callback(future: Future):
    ...

amltk.scheduling.Scheduler.on_timeout instance-attribute #

on_timeout: Subscriber[[], Any] = subscriber(TIMEOUT)

A Subscriber which is called when the scheduler reaches the timeout.

@scheduler.on_timeout
def my_callback():
    ...

amltk.scheduling.Scheduler.on_stop instance-attribute #

on_stop: Subscriber[[str, BaseException | None], Any] = (
    subscriber(STOP)
)

A Subscriber which is called when the scheduler is has been stopped due to the stop() method being called.

@scheduler.on_stop
def my_callback(stop_msg: str, exception: BaseException | None):
    ...

amltk.scheduling.Scheduler.on_finishing instance-attribute #

on_finishing: Subscriber[[], Any] = subscriber(FINISHING)

A Subscriber which is called when the scheduler is finishing up. This occurs right before the scheduler shuts down the executor.

@scheduler.on_finishing
def my_callback():
    ...

amltk.scheduling.Scheduler.on_finished instance-attribute #

on_finished: Subscriber[[], Any] = subscriber(FINISHED)

A Subscriber which is called when the scheduler is finished, has shutdown the executor and possibly terminated any remaining compute.

@scheduler.on_finished
def my_callback():
    ...

amltk.scheduling.Scheduler.on_empty instance-attribute #

on_empty: Subscriber[[], Any] = subscriber(EMPTY)

A Subscriber which is called when the queue is empty. This can be useful to re-fill the queue and prevent the scheduler from exiting.

@scheduler.on_empty
def my_callback():
    ...

We can access all the counts of all events through the scheduler.event_counts property. This is a dict which has the events as keys and the amount of times it was emitted as the values.

Controlling Callbacks#

There's a few parameters you can pass to any event subscriber such as @start or @future-result. These control the behavior of what happens when its event is fired and can be used to control the flow of your system.

These are covered more extensively in our events reference.

Repeat the callback a certain number of times, every time the event is emitted.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

# Print "hello" 3 times when the scheduler starts
@scheduler.on_start(repeat=3)
def print_hello() -> None:
    print("hello")

scheduler.run()
hello hello hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    └── def print_hello() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout
    @on_future_submitted
    @on_future_done
    @on_future_cancelled
    @on_future_exception
    @on_future_result

Limit the number of times a callback can be called, after which the callback will be ignored.

from asyncio import Future
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(2)

def expensive_function(x: int) -> int:
    return x ** 2

@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function, 2)

@scheduler.on_future_result(max_calls=3)
def print_result(future, result) -> None:
    scheduler.submit(expensive_function, 2)

scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 2}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    └── def submit_calculations() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout
    @on_future_submitted 4
    @on_future_done 4
    @on_future_cancelled
    @on_future_exception
    @on_future_result 4
    └── def print_result(future, result) -> None (3)

A callable which takes no arguments and returns a bool. The callback will only be called when the when callable returns True.

Below is a rather contrived example, but it shows how we can use the when parameter to control when the callback is called.

import random
from amltk.scheduling import Scheduler

LOCALE = random.choice(["English", "German"])

scheduler = Scheduler.with_processes(1)

@scheduler.on_start(when=lambda: LOCALE == "English")
def print_hello() -> None:
    print("hello")

@scheduler.on_start(when=lambda: LOCALE == "German")
def print_guten_tag() -> None:
    print("guten tag")

scheduler.run()
hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    ├── def print_hello() -> None (1)
    └── def print_guten_tag() -> None
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout
    @on_future_submitted
    @on_future_done
    @on_future_cancelled
    @on_future_exception
    @on_future_result

Only call the callback every every times the event is emitted. This includes the first time it's called.

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

# Print "hello" only every 2 times the scheduler starts.
@scheduler.on_start(every=2)
def print_hello() -> None:
    print("hello")

# Run the scheduler 5 times
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
hello hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 5
    @on_start 5
    └── def print_hello() -> None (2)
    @on_finishing 5
    @on_finished 5
    @on_stop
    @on_timeout
    @on_future_submitted
    @on_future_done
    @on_future_cancelled
    @on_future_exception
    @on_future_result

Stopping the Scheduler#

There are a few ways the Scheduler will stop. The one we have implicitly been using this whole time is when the Scheduler has run out of events to process with no compute left to perform. This is the default behavior but can be controlled with run(end_on_empty=False).

However, there are more explicit methods.

You can explicitly call stop() from aywhere on the Scheduler to stop it. By default this will wait for any currently running compute to finish but you can inform the scheduler to stop immediately with run(wait=False).

You'll notice this in the event count of the Scheduler where the event @future-cancelled was fired.

import time
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def expensive_function(sleep_for: int) -> None:
    time.sleep(sleep_for)

@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function, sleep_for=10)

@scheduler.on_future_submitted
def stop_the_scheduler(_) -> None:
    scheduler.stop()

scheduler.run(wait=False)
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
    @on_start 1
    └── def submit_calculations() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop 1
    @on_timeout
    @on_future_submitted 1
    └── def stop_the_scheduler(_) -> None (1)
    @on_future_done
    @on_future_cancelled 1
    @on_future_exception
    @on_future_result

You can also tell the Scheduler to stop after a certain amount of time with the timeout= argument to run().

This will also trigger the @timeout event as seen in the Scheduler output.

import time
from asyncio import Future

from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def expensive_function() -> None:
    time.sleep(0.1)
    return 42

@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(expensive_function)

# This will endlessly loop the scheduler
@scheduler.on_future_done
def submit_again(future: Future) -> None:
    if scheduler.running():
        scheduler.submit(expensive_function)

scheduler.run(timeout=1)  # End after 1 second
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
    @on_start 1
    └── def submit_calculations() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop
    @on_timeout 1
    @on_future_submitted 10
    @on_future_done 10
    └── def submit_again(future: _asyncio.Future) -> None (10)
    @on_future_cancelled
    @on_future_exception
    @on_future_result 10

Exceptions#

Dealing with exceptions is an important part of any AutoML system. It is important to clarify that there are two kinds of exceptions that can occur within the Scheduler.

The 1st kind that can happen is within some function submitted with submit(). When this happens, the @future-exception will be emitted, passing the exception to the callback.

By default, the Scheduler will then raise the exception that occurred up to your program and end its computations. This is done by setting run(on_exception="raise"), the default, but it also takes three other possibilities:

  • "continue" - Just emit the exception and keep running.
  • "end" - Emit the exception and then stop the scheduler but don't raise it.
  • {MyException: "continue", OtherException: "raise"} - Decide what to do for each exception type. Note that this checked in order using isinstance(...)

One example is to just stop() the scheduler when some exception occurs.

from asyncio import Future
from amltk.scheduling import Scheduler

scheduler = Scheduler.with_processes(1)

def failing_compute_function(err_msg: str) -> None:
    raise ValueError(err_msg)

@scheduler.on_start
def submit_calculations() -> None:
    scheduler.submit(failing_compute_function, "Failed!")

@scheduler.on_future_exception
def stop_the_scheduler(future: Future, exception: Exception) -> None:
    print(f"Got exception {exception}")
    scheduler.stop()  # You can optionally pass `exception=` for logging purposes.

scheduler.run(on_exception="continue")  # Scheduler will not stop because of the error
Got exception Failed!
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
    @on_start 1
    └── def submit_calculations() -> None (1)
    @on_finishing 1
    @on_finished 1
    @on_stop 1
    @on_timeout
    @on_future_submitted 1
    @on_future_done 1
    @on_future_cancelled
    @on_future_exception 1
    └── def stop_the_scheduler(future: _asyncio.Future, exception: Exception) ->
        None (1)
    @on_future_result

The second kind of exception that can happen is one that happens in the main process. For example, this could happen in one of your callbacks or in the Scheduler itself (please raise an issue if this occurs!). By default when you call run() it will set run(on_exception="raise") and raise the exception that occurred, with its traceback. This is to help you debug your program.

You may also use run(on_exception="end"), which will just end the Scheduler and raise no exception, or use run(on_exception="continue"), in which case the Scheduler will continue on with whatever events are next to process.

Tasks#

Now that we have seen how the Scheduler works, we can look at the Task, a wrapper around a function that you'll want to submit to the Scheduler. The preferred way to create one of these Tasks is to use scheduler.task(function).

Running a task#

In the following example, we will create a task for the scheduler and attempt to call it. This task will be run by the backend specified.

from amltk import Scheduler

# Some function to offload to compute
def collatz(n: int) -> int:
    is_even = (n % 2 == 0)
    return int(n / 2) if is_even else int(3 * n + 1)

scheduler = Scheduler.with_processes(1)

# Creating a "task"
collatz_task = scheduler.task(collatz)

try:
    collatz_task.submit(5)
except Exception as e:
    print(f"{type(e)}: {e}")
<class 'amltk.exceptions.SchedulerNotRunningError'>: Scheduler is not running, cannot submit task <function collatz at 0x7efd59f41630> with args=(5,), kwargs={}

As you can see, we can not submit tasks before the scheduler is running. This is because the backend that it's running on usually has to be setup and teardown when scheduler.run() is called.

The proper approach would be to do the following:

from amltk import Scheduler

# Some function to offload to compute
def collatz(n: int) -> int:
    is_even = (n % 2 == 0)
    return int(n / 2) if is_even else int(3 * n + 1)

# Setup the scheduler and create a "task"
scheduler = Scheduler.with_processes(1)
collatz_task = scheduler.task(collatz)

@scheduler.on_start
def launch_initial_task() -> None:
    collatz_task.submit(5)

scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
@on_start 1
└── def launch_initial_task() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 1
@on_future_done 1
@on_future_cancelled
@on_future_exception
@on_future_result 1
┗━━ ╭─ Task collatz(n: int) -> int ────────────────────────────────────────────╮
    ╰─────────────────────── Ref: Task-collatz-5yu2a82u ───────────────────────╯

Task Specific Events#

As you may have noticed, we can see the Task itself in the Scheduler as well as the events it defines. This allows us to react to certain tasks themselves, and not generally everything that may pass through the Scheduler.

In the below example, we'll do two things. First, we'll create a Task and react to its events, but also use the Scheduler directly and use submit(). Then we'll see how the callbacks reacted to different events.

from amltk import Scheduler

def echo(msg: str) -> str:
    return msg

scheduler = Scheduler.with_processes(1)
echo_task = scheduler.task(echo)

# Launch the task and do a raw `submit()` with the Scheduler
@scheduler.on_start
def launch_initial_task() -> None:
    echo_task.submit("hello")
    scheduler.submit(echo, "hi")

# Callback for anything resulting from the scheduler
@scheduler.on_future_result
def from_scheduler(_, msg: str) -> None:
    print(f"result_from_scheduler {msg}")

# Callback for specifically results from the `echo_task`
@echo_task.on_result
def from_task(_, msg: str) -> None:
    print(f"result_from_task {msg}")

scheduler.run()
result_from_scheduler hello result_from_task hello result_from_scheduler hi
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
@on_start 1
└── def launch_initial_task() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 2
@on_future_done 2
@on_future_cancelled
@on_future_exception
@on_future_result 2
└── def from_scheduler(_, msg: str) -> None (2)
┗━━ ╭─ Task echo(msg: str) -> str ─────────────────────────────────────────────╮
    ╰──────────────────────── Ref: Task-echo-FP5NRPb4 ─────────────────────────╯

We can see in the output of the above code that the @scheduler.on_future_result was called twice, meaning our callback def from_scheduler() was called twice, once for the result of echo_task.submit("hello") and the other time from scheduler.submit(echo, "hi"). On the other hand, the event @task.on_result was only called once, meaning our callback def from_task() was only called once.

In practice, you will likely need to define a variety of tasks for your AutoML System and having dedicated code to respond to individual tasks is of critical importance. This can even allow you to chain the results of one task into another, and define more complex workflows.

The below example shows how you can define two tasks with the scheduler and have certain callbacks for different tasks, or even share callbacks between them!

from amltk import Scheduler

def expensive_thing_1(x: int) -> int:
    return x * 2

def expensive_thing_2(x: int) -> int:
    return x ** 2

# Create a scheduler and 2 tasks
scheduler = Scheduler.with_processes(1)
task_1 = scheduler.task(expensive_thing_1)
task_2 = scheduler.task(expensive_thing_2)

# A list of things we want to compute
items = iter([1, 2, 3])

@scheduler.on_start
def submit_initial() -> None:
    next_item = next(items)
    task_1.submit(next_item)

@task_1.on_result
def submit_task_2_with_results_of_task_1(_, result: int) -> None:
    """When task_1 returns, send the result to task_2"""
    task_2.submit(result)

@task_1.on_result
def submit_task_1_with_next_item(_, result: int) -> None:
    """When task_1 returns, launch it again with the next items"""
    next_item = next(items, None)
    if next_item is not None:
        task_1.submit(next_item)
        return

    print("Done!")

# You may share callbacks for the two tasks
@task_1.on_exception
@task_2.on_exception
def handle_task_exception(_, exception: BaseException) -> None:
    print(f"A task errored! {exception}")

scheduler.run()
Done!
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 1}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
@on_start 1
└── def submit_initial() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 6
@on_future_done 6
@on_future_cancelled
@on_future_exception
@on_future_result 6
┣━━ ╭─ Task expensive_thing_1(x: int) -> int ──────────────────────────────────╮
╰────────────────── Ref: Task-expensive_thing_1-5e3V0LFE ──────────────────╯
┗━━ ╭─ Task expensive_thing_2(x: int) -> int ──────────────────────────────────╮
    ╰────────────────── Ref: Task-expensive_thing_2-hDPx098A ──────────────────╯

Task Plugins#

Another benefit of Task objects is that we can attach a Plugin to them. These plugins can automate control behaviour of tasks, either through preventing their execution, modifying the function and its arguments or even attaching plugin specific events!

For a complete reference, please see the plugin reference page.

Call Limiter#

Perhaps one of the more useful plugins, at least when designing an AutoML System, is the Limiter plugin. This can help you control both its concurrency or the absolute limit of how many times a certain task can be successfully submitted.

In the following contrived example, we will setup a Scheduler with 2 workers and attempt to submit a Task 4 times in rapid succession. However, we have the constraint that we only ever want 2 of these tasks running at a given time. Let's see how we could achieve that.

from amltk.scheduling import Scheduler, Limiter

def my_func(x: int) -> int:
    return x

scheduler = Scheduler.with_processes(2)

# Specify a concurrency limit of 2
task = scheduler.task(my_func, plugins=Limiter(max_concurrent=2))

# A list of 10 things we want to compute
items = iter(range(10))
results = []

@scheduler.on_start(repeat=4)  # Repeat callback 4 times
def submit() -> None:
    next_item = next(items)
    task.submit(next_item)

@task.on_result
def record_result(_, result: int) -> None:
    results.append(result)

@task.on_result
def launch_another(_, result: int) -> None:
    next_item = next(items, None)
    if next_item is not None:
        task.submit(next_item)

scheduler.run()
print(results)
[1, 0, 4, 5, 6, 7, 8, 9]
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 2}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
@on_start 1
└── def submit() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 8
@on_future_done 8
@on_future_cancelled
@on_future_exception
@on_future_result 8
┗━━ ╭─ Task my_func(x: int) -> int ────────────────────────────────────────────╮
     ╭─────────────────────────── Plugin limiter ───────────────────────────╮ 
     │ Concurrent 0/2                                                       │ 
     ╰──────────────────────────────────────────────────────────────────────╯ 
    ╰─────────────────────── Ref: Task-my_func-9zUq9QHP ───────────────────────╯

You can notice that this limiting worked, given the numbers 2 and 3 were skipped and not printed. As expected, we successfully launched the task with both 0 and 1, but as these tasks were not done processing, the Limiter kicks in and prevents the other two.

A natural extension to ask is then, "how do we requeue these?". Well, let's take a look at the above output. The plugin has added three new events to Task, namely @call-limit-reached, @concurrent-limit-reached and @disabled-due-to-running-task.

To subscribe to these extra events (or any for that matter), we can use the task.on() method. Below is the same example except here we respond to @call-limit-reached and requeue the submissions that failed.

from amltk.scheduling import Scheduler, Limiter, Task
from amltk.types import Requeue

def my_func(x: int) -> int:
    return x

scheduler = Scheduler.with_processes(2)
task = scheduler.task(my_func, plugins=Limiter(max_concurrent=2))

# A list of 10 things we want to compute
items = Requeue(range(10))  # A convenience type that you can requeue/append to
results = []

@scheduler.on_start(repeat=4)  # Repeat callback 4 times
def submit() -> None:
    next_item = next(items)
    task.submit(next_item)

@task.on("concurrent-limit-reached")
def add_back_to_queue(task: Task, x: int) -> None:
    items.requeue(x)  # Put x back at the start of the queue

@task.on_result
def record_result(_, result: int) -> None:
    results.append(result)

@task.on_result
def launch_another(_, result: int) -> None:
    next_item = next(items, None)
    if next_item is not None:
        task.submit(next_item)

scheduler.run()
print(results)
[1, 0, 2, 3, 5, 4, 6, 7, 8, 9]
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 2}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
@on_start 1
└── def submit() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 10
@on_future_done 10
@on_future_cancelled
@on_future_exception
@on_future_result 10
┗━━ ╭─ Task my_func(x: int) -> int ────────────────────────────────────────────╮
     ╭─────────────────────────── Plugin limiter ───────────────────────────╮ 
     │ Concurrent 0/2                                                       │ 
     ╰──────────────────────────────────────────────────────────────────────╯ 
    ╰─────────────────────── Ref: Task-my_func-BWdi3t6r ───────────────────────╯

Under Construction#

Please see the following reference pages in the meantime: