Scheduling
AutoML-toolkit was designed to make offloading computation away from the main process easy, to foster increased ability for interact-ability, deployment and control. At the same time, we wanted to have an event based system to manage the complexity that comes with AutoML systems, all while making the API intuitive and extensible.
By the end of this guide, we hope that the following code, its options and its inner working become easy to understand.
from amltk import Scheduler
# Some function to offload to compute
def collatz(n: int) -> int:
is_even = (n % 2 == 0)
return int(n / 2) if is_even else int(3 * n + 1)
# Setup the scheduler and create a "task"
scheduler = Scheduler.with_processes(1)
task = scheduler.task(collatz)
answers = []
# Tell the scheduler what to do when it starts
@scheduler.on_start
def start_computing() -> None:
answers.append(12)
task.submit(12) # Launch the task with the argument 12
# Tell the scheduler what to do when the task returns
@task.on_result
def compute_next(_, next_n: int) -> None:
answers.append(next_n)
if scheduler.running() and next_n != 1:
task.submit(next_n)
# Run the scheduler
scheduler.run(timeout=1) # One second timeout
print(answers)
[12, 6, 3, 10, 5, 16, 8, 4, 2, 1]
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def start_computing() -> None (1)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 9
┃ @on_future_done 9
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 9
┗━━ ╭─ Task collatz(n: int) -> int ────────────────────────────────────────────╮
╰─────────────────────── Ref: Task-collatz-HWl6vJIn ───────────────────────╯
We start by introducing the engine, the Scheduler
and how this interacts with python's built-in Executor
interface to offload compute to processes, cluster nodes, or even cloud resources.
However, the Scheduler
is rather useless without some fuel. For this,
we present Tasks
, the computational task to
perform with the Scheduler
and start the system's gears turning.
rich
printing
To get the same output locally (terminal or Notebook), you can either
call thing.__rich()__
, use from rich import print; print(thing)
or in a Notebook, simply leave it as the last object of a cell.
You'll have to install with amltk[jupyter]
or
pip install rich[jupyter]
manually.
Scheduler#
The core engine of the AutoML-Toolkit is the Scheduler
.
Its purpose is to allow you to create workflows in an event driven manner. It does
this by allowing you to submit()
functions
with arguments to be computed in the background, while the main process can continue
to do other work. Once this computation has completed, you can react with various
callbacks, most likely to submit more computations.
Sounds like asyncio
?
If you're familiar with pythons await/async
syntax, then this description
might sound similar. The Scheduler
is powered by an asynchronous event loop
but hides this complexity in its API. We do have an asynchronous API which
we will discuss later.
Backend#
The first thing to do is define where this computation should happen.
A Scheduler
builds upon an
Executor
,
an interface provided by python's concurrent.futures
module. This interface is used to abstract away the details of how the
computation is actually performed. This allows us to easily switch between
different backends, such as threads, processes, clusters, cloud resources,
or even custom backends.
Available Executors
You can find a list of these in our executor reference.
The simplest one is a ProcessPoolExecutor
,
which will create a pool of processes to run the compute in parallel. We provide
a convenience function for this as
Scheduler.with_processes()
.
from concurrent.futures import ProcessPoolExecutor
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(2)
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_start
@on_finishing
@on_finished
@on_stop
@on_timeout
@on_empty
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Running the Scheduler#
You may have noticed from the above example that there are many events the scheduler will emit,
such as @start
or @future-done
. One particularly important one is
@start
, an event to signal
the scheduler has started and is ready to accept tasks.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
@scheduler.on_start
def print_hello() -> None:
print("hello")
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_start
└── def print_hello() -> None
@on_finishing
@on_finished
@on_stop
@on_timeout
@on_empty
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
From the output, we can see that the print_hello()
function was registered
to the event @start
, but it was never called and no "hello"
was printed.
For this to happen, we actually have to run()
the scheduler.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
@scheduler.on_start
def print_hello() -> None:
print("hello")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def print_hello() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Now the output will show a little yellow number next to the @start
and the print_hello()
, indicating that event was triggered and the callback
was called.
You can subscribe multiple callbacks to the same event and they will each be called in the order they were registered.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
@scheduler.on_start
def print_hello_1() -> None:
print("hello 1")
def print_hello_2() -> None:
print("hello 2")
scheduler.on_start(print_hello_2) # You can also register without a decorator
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
├── def print_hello_1() -> None (1)
└── def print_hello_2() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Determinism
It's worth noting that even though we are using an event based system, we are still guaranteed deterministic execution of the callbacks for any given event. The source of indeterminism is the order in which events are emitted, this is determined entirely by your compute functions themselves.
Submitting Compute#
The Scheduler
exposes a simple submit()
method which allows you to submit compute to be performed while the scheduler is running.
While we will later visit the Task
class for
defining these units of compute, it is beneficial to see how the Scheduler
operates directly with submit()
, without abstractions.
In the below example, we will use the
@future-result
event to submit more compute once the previous computation has returned a result.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(2)
def expensive_function(x: int) -> int:
return 2 ** x
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function, 2) # Submit compute
# Called when the submitted function is done
@scheduler.on_future_result
def print_result(_, result: int) -> None:
print(result)
if result < 10:
scheduler.submit(expensive_function, result)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 2
@on_future_done 2
@on_future_cancelled
@on_future_exception
@on_future_result 2
└── def print_result(_, result: int) -> None (2)
What's a Future
?
A Future
is a special object which represents the result
of an asynchronous computation. It's an object that can be queried for
its result/exception of some computation which may not have completed yet.
Scheduler Events#
Here are some of the possible @events
a Scheduler
can emit, but
please visit the scheduler reference
for a complete list.
@events
amltk.scheduling.Scheduler.on_start
instance-attribute
#
on_start: Subscriber[[], Any] = subscriber(STARTED)
A Subscriber
which is called when the
scheduler starts. This is the first event emitted by the scheduler and
one of the only ways to submit the initial compute to the scheduler.
amltk.scheduling.Scheduler.on_future_result
instance-attribute
#
on_future_result: Subscriber[[Future, Any], Any] = (
subscriber(FUTURE_RESULT)
)
A Subscriber
which is called when
a future returned with a result, no exception raise.
amltk.scheduling.Scheduler.on_future_exception
instance-attribute
#
on_future_exception: Subscriber[
[Future, BaseException], Any
] = subscriber(FUTURE_EXCEPTION)
A Subscriber
which is called when
some compute raised an uncaught exception.
amltk.scheduling.Scheduler.on_future_submitted
instance-attribute
#
on_future_submitted: Subscriber[[Future], Any] = subscriber(
FUTURE_SUBMITTED
)
A Subscriber
which is called when
some compute is submitted.
amltk.scheduling.Scheduler.on_future_done
instance-attribute
#
on_future_done: Subscriber[[Future], Any] = subscriber(
FUTURE_DONE
)
A Subscriber
which is called when
some compute is done, regardless of whether it was successful or not.
amltk.scheduling.Scheduler.on_future_cancelled
instance-attribute
#
on_future_cancelled: Subscriber[[Future], Any] = subscriber(
FUTURE_CANCELLED
)
A Subscriber
which is called
when a future is cancelled. This usually occurs due to the underlying Scheduler,
and is not something we do directly, other than when shutting down the scheduler.
amltk.scheduling.Scheduler.on_timeout
instance-attribute
#
on_timeout: Subscriber[[], Any] = subscriber(TIMEOUT)
A Subscriber
which is called when
the scheduler reaches the timeout.
amltk.scheduling.Scheduler.on_stop
instance-attribute
#
on_stop: Subscriber[[str, BaseException | None], Any] = (
subscriber(STOP)
)
A Subscriber
which is called when the
scheduler is has been stopped due to the stop()
method being called.
amltk.scheduling.Scheduler.on_finishing
instance-attribute
#
on_finishing: Subscriber[[], Any] = subscriber(FINISHING)
A Subscriber
which is called when the
scheduler is finishing up. This occurs right before the scheduler shuts down
the executor.
amltk.scheduling.Scheduler.on_finished
instance-attribute
#
on_finished: Subscriber[[], Any] = subscriber(FINISHED)
A Subscriber
which is called when
the scheduler is finished, has shutdown the executor and possibly
terminated any remaining compute.
amltk.scheduling.Scheduler.on_empty
instance-attribute
#
on_empty: Subscriber[[], Any] = subscriber(EMPTY)
A Subscriber
which is called when the
queue is empty. This can be useful to re-fill the queue and prevent the
scheduler from exiting.
We can access all the counts of all events through the
scheduler.event_counts
property.
This is a dict
which has the events as keys and the amount of times
it was emitted as the values.
Controlling Callbacks#
There's a few parameters you can pass to any event subscriber
such as @start
or @future-result
.
These control the behavior of what happens when its event is fired and can
be used to control the flow of your system.
These are covered more extensively in our events reference.
Repeat the callback a certain number of times, every time the event is emitted.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
# Print "hello" 3 times when the scheduler starts
@scheduler.on_start(repeat=3)
def print_hello() -> None:
print("hello")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def print_hello() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Limit the number of times a callback can be called, after which the callback will be ignored.
from asyncio import Future
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(2)
def expensive_function(x: int) -> int:
return x ** 2
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function, 2)
@scheduler.on_future_result(max_calls=3)
def print_result(future, result) -> None:
scheduler.submit(expensive_function, 2)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 4
@on_future_done 4
@on_future_cancelled
@on_future_exception
@on_future_result 4
└── def print_result(future, result) -> None (3)
A callable which takes no arguments and returns a bool
. The callback
will only be called when the when
callable returns True
.
Below is a rather contrived example, but it shows how we can use the
when
parameter to control when the callback is called.
import random
from amltk.scheduling import Scheduler
LOCALE = random.choice(["English", "German"])
scheduler = Scheduler.with_processes(1)
@scheduler.on_start(when=lambda: LOCALE == "English")
def print_hello() -> None:
print("hello")
@scheduler.on_start(when=lambda: LOCALE == "German")
def print_guten_tag() -> None:
print("guten tag")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
├── def print_hello() -> None (1)
└── def print_guten_tag() -> None
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Only call the callback every every
times the event is emitted. This
includes the first time it's called.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
# Print "hello" only every 2 times the scheduler starts.
@scheduler.on_start(every=2)
def print_hello() -> None:
print("hello")
# Run the scheduler 5 times
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 5
@on_start 5
└── def print_hello() -> None (2)
@on_finishing 5
@on_finished 5
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Stopping the Scheduler#
There are a few ways the Scheduler
will stop. The one we have implicitly
been using this whole time is when the Scheduler
has run out of events
to process with no compute left to perform. This is the default behavior
but can be controlled with run(end_on_empty=False)
.
However, there are more explicit methods.
You can explicitly call stop()
from aywhere on the Scheduler
to stop it. By default this will
wait for any currently running compute to finish but you can inform the
scheduler to stop immediately with run(wait=False)
.
You'll notice this in the event count of the Scheduler where the event
@future-cancelled
was fired.
import time
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def expensive_function(sleep_for: int) -> None:
time.sleep(sleep_for)
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function, sleep_for=10)
@scheduler.on_future_submitted
def stop_the_scheduler(_) -> None:
scheduler.stop()
scheduler.run(wait=False)
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop 1
@on_timeout
@on_future_submitted 1
└── def stop_the_scheduler(_) -> None (1)
@on_future_done
@on_future_cancelled 1
@on_future_exception
@on_future_result
You can also tell the Scheduler
to stop after a certain amount of time
with the timeout=
argument to run()
.
This will also trigger the @timeout
event as seen in the Scheduler
output.
import time
from asyncio import Future
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def expensive_function() -> None:
time.sleep(0.1)
return 42
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function)
# This will endlessly loop the scheduler
@scheduler.on_future_done
def submit_again(future: Future) -> None:
if scheduler.running():
scheduler.submit(expensive_function)
scheduler.run(timeout=1) # End after 1 second
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout 1
@on_future_submitted 10
@on_future_done 10
└── def submit_again(future: _asyncio.Future) -> None (10)
@on_future_cancelled
@on_future_exception
@on_future_result 10
Exceptions#
Dealing with exceptions is an important part of any AutoML system. It is important to clarify that there are two kinds of exceptions that can occur within the Scheduler.
The 1st kind that can happen is within some function submitted with
submit()
. When this happens,
the @future-exception
will be emitted, passing the exception to the callback.
By default, the Scheduler
will then raise the exception that occurred up to your program
and end its computations. This is done by setting
run
(on_exception="raise"),
the default, but it also takes three other possibilities:
"continue"
- Just emit the exception and keep running."end"
- Emit the exception and then stop the scheduler but don't raise it.{MyException: "continue", OtherException: "raise"}
- Decide what to do for each exception type. Note that this checked in order usingisinstance(...)
One example is to just stop()
the scheduler when some exception occurs.
from asyncio import Future
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def failing_compute_function(err_msg: str) -> None:
raise ValueError(err_msg)
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(failing_compute_function, "Failed!")
@scheduler.on_future_exception
def stop_the_scheduler(future: Future, exception: Exception) -> None:
print(f"Got exception {exception}")
scheduler.stop() # You can optionally pass `exception=` for logging purposes.
scheduler.run(on_exception="continue") # Scheduler will not stop because of the error
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop 1
@on_timeout
@on_future_submitted 1
@on_future_done 1
@on_future_cancelled
@on_future_exception 1
└── def stop_the_scheduler(future: _asyncio.Future, exception: Exception) ->
None (1)
@on_future_result
The second kind of exception that can happen is one that happens in the main process.
For example, this could happen in one of your callbacks or in the Scheduler
itself (please raise an issue if this occurs!).
By default when you call run()
it will set
run(on_exception="raise")
and raise the exception that occurred, with its traceback.
This is to help you debug your program.
You may also use run(on_exception="end")
, which will just end the Scheduler
and raise no exception,
or use run(on_exception="continue")
, in which case the Scheduler
will continue on with whatever events
are next to process.
Tasks#
Now that we have seen how the Scheduler
works,
we can look at the Task
, a wrapper around a function
that you'll want to submit to the Scheduler
. The preferred way to create one
of these Tasks
is to use scheduler.task(function)
.
Running a task#
In the following example, we will create a task for the scheduler and attempt to call it. This task will be run by the backend specified.
from amltk import Scheduler
# Some function to offload to compute
def collatz(n: int) -> int:
is_even = (n % 2 == 0)
return int(n / 2) if is_even else int(3 * n + 1)
scheduler = Scheduler.with_processes(1)
# Creating a "task"
collatz_task = scheduler.task(collatz)
try:
collatz_task.submit(5)
except Exception as e:
print(f"{type(e)}: {e}")
As you can see, we can not submit tasks before the scheduler is running. This
is because the backend that it's running on usually has to be setup and teardown when
scheduler.run()
is called.
The proper approach would be to do the following:
from amltk import Scheduler
# Some function to offload to compute
def collatz(n: int) -> int:
is_even = (n % 2 == 0)
return int(n / 2) if is_even else int(3 * n + 1)
# Setup the scheduler and create a "task"
scheduler = Scheduler.with_processes(1)
collatz_task = scheduler.task(collatz)
@scheduler.on_start
def launch_initial_task() -> None:
collatz_task.submit(5)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def launch_initial_task() -> None (1)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 1
┃ @on_future_done 1
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 1
┗━━ ╭─ Task collatz(n: int) -> int ────────────────────────────────────────────╮
╰─────────────────────── Ref: Task-collatz-5yu2a82u ───────────────────────╯
Task Specific Events#
As you may have noticed, we can see the Task
itself in the Scheduler
as well as the
events it defines. This allows us to react to certain tasks themselves, and not generally
everything that may pass through the Scheduler
.
In the below example, we'll do two things. First, we'll create a Task
and react to
its events, but also use the Scheduler
directly and use submit()
. Then we'll see
how the callbacks reacted to different events.
from amltk import Scheduler
def echo(msg: str) -> str:
return msg
scheduler = Scheduler.with_processes(1)
echo_task = scheduler.task(echo)
# Launch the task and do a raw `submit()` with the Scheduler
@scheduler.on_start
def launch_initial_task() -> None:
echo_task.submit("hello")
scheduler.submit(echo, "hi")
# Callback for anything resulting from the scheduler
@scheduler.on_future_result
def from_scheduler(_, msg: str) -> None:
print(f"result_from_scheduler {msg}")
# Callback for specifically results from the `echo_task`
@echo_task.on_result
def from_task(_, msg: str) -> None:
print(f"result_from_task {msg}")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def launch_initial_task() -> None (1)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 2
┃ @on_future_done 2
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 2
┃ └── def from_scheduler(_, msg: str) -> None (2)
┗━━ ╭─ Task echo(msg: str) -> str ─────────────────────────────────────────────╮
╰──────────────────────── Ref: Task-echo-FP5NRPb4 ─────────────────────────╯
We can see in the output of the above code that the @scheduler.on_future_result
was called twice,
meaning our callback def from_scheduler()
was called twice,
once for the result of echo_task.submit("hello")
and the other time
from scheduler.submit(echo, "hi")
. On the other hand, the event @task.on_result
was only called once, meaning our callback def from_task()
was only called once.
In practice, you will likely need to define a variety of tasks for your AutoML System and having dedicated code to respond to individual tasks is of critical importance. This can even allow you to chain the results of one task into another, and define more complex workflows.
The below example shows how you can define two tasks with the scheduler and have certain callbacks for different tasks, or even share callbacks between them!
from amltk import Scheduler
def expensive_thing_1(x: int) -> int:
return x * 2
def expensive_thing_2(x: int) -> int:
return x ** 2
# Create a scheduler and 2 tasks
scheduler = Scheduler.with_processes(1)
task_1 = scheduler.task(expensive_thing_1)
task_2 = scheduler.task(expensive_thing_2)
# A list of things we want to compute
items = iter([1, 2, 3])
@scheduler.on_start
def submit_initial() -> None:
next_item = next(items)
task_1.submit(next_item)
@task_1.on_result
def submit_task_2_with_results_of_task_1(_, result: int) -> None:
"""When task_1 returns, send the result to task_2"""
task_2.submit(result)
@task_1.on_result
def submit_task_1_with_next_item(_, result: int) -> None:
"""When task_1 returns, launch it again with the next items"""
next_item = next(items, None)
if next_item is not None:
task_1.submit(next_item)
return
print("Done!")
# You may share callbacks for the two tasks
@task_1.on_exception
@task_2.on_exception
def handle_task_exception(_, exception: BaseException) -> None:
print(f"A task errored! {exception}")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def submit_initial() -> None (1)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 6
┃ @on_future_done 6
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 6
┣━━ ╭─ Task expensive_thing_1(x: int) -> int ──────────────────────────────────╮
┃ ╰────────────────── Ref: Task-expensive_thing_1-5e3V0LFE ──────────────────╯
┗━━ ╭─ Task expensive_thing_2(x: int) -> int ──────────────────────────────────╮
╰────────────────── Ref: Task-expensive_thing_2-hDPx098A ──────────────────╯
Task Plugins#
Another benefit of Task
objects is that we can attach
a Plugin
to them. These plugins can automate control
behaviour of tasks, either through preventing their execution,
modifying the function and its arguments or even attaching plugin specific events!
For a complete reference, please see the plugin reference page.
Call Limiter#
Perhaps one of the more useful plugins, at least when designing an AutoML System, is the
Limiter
plugin. This can help you control
both its concurrency or the absolute limit of how many times a certain task can be
successfully submitted.
In the following contrived example, we will setup a Scheduler
with 2 workers and attempt
to submit a Task
4 times in rapid succession. However, we have the constraint that we
only ever want 2 of these tasks running at a given time. Let's see how we could achieve that.
from amltk.scheduling import Scheduler, Limiter
def my_func(x: int) -> int:
return x
scheduler = Scheduler.with_processes(2)
# Specify a concurrency limit of 2
task = scheduler.task(my_func, plugins=Limiter(max_concurrent=2))
# A list of 10 things we want to compute
items = iter(range(10))
results = []
@scheduler.on_start(repeat=4) # Repeat callback 4 times
def submit() -> None:
next_item = next(items)
task.submit(next_item)
@task.on_result
def record_result(_, result: int) -> None:
results.append(result)
@task.on_result
def launch_another(_, result: int) -> None:
next_item = next(items, None)
if next_item is not None:
task.submit(next_item)
scheduler.run()
print(results)
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def submit() -> None (1)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 8
┃ @on_future_done 8
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 8
┗━━ ╭─ Task my_func(x: int) -> int ────────────────────────────────────────────╮
│ ╭─────────────────────────── Plugin limiter ───────────────────────────╮ │
│ │ Concurrent 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────── Ref: Task-my_func-9zUq9QHP ───────────────────────╯
You can notice that this limiting worked, given the numbers 2
and 3
were skipped and not printed. As expected, we successfully launched the task with both
0
and 1
, but as these tasks were not done processing, the Limiter
kicks in and prevents the other two.
A natural extension to ask is then, "how do we requeue these?". Well, let's take a look at the above
output. The plugin has added three new events to Task
, namely
@call-limit-reached
, @concurrent-limit-reached
and @disabled-due-to-running-task
.
To subscribe to these extra events (or any for that matter), we can use
the task.on()
method. Below is the same example except here we respond to @call-limit-reached
and requeue the submissions that failed.
from amltk.scheduling import Scheduler, Limiter, Task
from amltk.types import Requeue
def my_func(x: int) -> int:
return x
scheduler = Scheduler.with_processes(2)
task = scheduler.task(my_func, plugins=Limiter(max_concurrent=2))
# A list of 10 things we want to compute
items = Requeue(range(10)) # A convenience type that you can requeue/append to
results = []
@scheduler.on_start(repeat=4) # Repeat callback 4 times
def submit() -> None:
next_item = next(items)
task.submit(next_item)
@task.on("concurrent-limit-reached")
def add_back_to_queue(task: Task, x: int) -> None:
items.requeue(x) # Put x back at the start of the queue
@task.on_result
def record_result(_, result: int) -> None:
results.append(result)
@task.on_result
def launch_another(_, result: int) -> None:
next_item = next(items, None)
if next_item is not None:
task.submit(next_item)
scheduler.run()
print(results)
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def submit() -> None (1)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 10
┃ @on_future_done 10
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 10
┗━━ ╭─ Task my_func(x: int) -> int ────────────────────────────────────────────╮
│ ╭─────────────────────────── Plugin limiter ───────────────────────────╮ │
│ │ Concurrent 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────── Ref: Task-my_func-BWdi3t6r ───────────────────────╯
Under Construction#
Please see the following reference pages in the meantime:
- scheduler reference - A slightly
more condensed version of how to use the
Scheduler
. - task reference - A more comprehensive
explanation of
Task
s and their@events
. - plugin reference - An intro to plugins and how to create your own.
- executors reference - A list of executors and how to use them.
- events reference - A more comprehensive look at the event system in AutoML-Toolkit and how to work with them or extend them.