Scheduler
Scheduler#
The Scheduler
uses
an Executor
, a builtin python native with
a submit(f, *args, **kwargs)
function to submit compute to
be compute else where, whether it be locally or remotely.
The Scheduler
is primarily used to dispatch compute to an Executor
and
emit @events
, which can trigger user callbacks.
Typically you should not use the Scheduler
directly for dispatching and
responding to computed functions, but rather use a Task
Running in a Jupyter Notebook/Colab
If you are using a Jupyter Notebook, you likley need to use the following at the top of your notebook:
This is due to the fact a notebook runs in an async context. If you do not wish to use the above snippet, you can instead use:
Basic Usage
In this example, we create a scheduler that uses local processes as
workers. We then create a task that will run a function fn
and submit it
to the scheduler. Lastly, a callback is registered to @future-result
to print the
result when the compute is done.
from amltk.scheduling import Scheduler
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
@scheduler.on_start
def launch_the_compute():
scheduler.submit(fn, 1)
@scheduler.on_future_result
def callback(future, result):
print(f"Result: {result}")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def launch_the_compute() (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 1
@on_future_done 1
@on_future_cancelled
@on_future_exception
@on_future_result 1
└── def callback(future, result) (1)
The last line in the previous example called
scheduler.run()
is what starts the scheduler
running, in which it will first emit the @start
event. This triggered the
callback launch_the_compute()
which submitted the function fn
with the
arguments 1
.
The scheduler then ran the compute and waited for it to complete, emitting the
@future-result
event when it was done successfully. This triggered the callback
callback()
which printed the result.
At this point, there is no more compute happening and no more events to respond to so the scheduler will halt.
@events
When the scheduler enters some important state, it will emit an event to let you know.
A Subscriber
which is called when the
scheduler starts. This is the first event emitted by the scheduler and
one of the only ways to submit the initial compute to the scheduler.
A Subscriber
which is called when the
scheduler is finishing up. This occurs right before the scheduler shuts down
the executor.
A Subscriber
which is called when
the scheduler is finished, has shutdown the executor and possibly
terminated any remaining compute.
A Subscriber
which is called when the
scheduler is has been stopped due to the stop()
method being called.
A Subscriber
which is called when
the scheduler reaches the timeout.
A Subscriber
which is called when the
queue is empty. This can be useful to re-fill the queue and prevent the
scheduler from exiting.
When any compute goes through the Scheduler
, it will emit an event
to let you know. You should however prefer to use a
Task
as it will emit specific events
for the task at hand, and not all compute.
A Subscriber
which is called when
some compute is submitted.
A Subscriber
which is called when
a future returned with a result, no exception raise.
A Subscriber
which is called when
some compute raised an uncaught exception.
A Subscriber
which is called when
some compute is done, regardless of whether it was successful or not.
A Subscriber
which is called
when a future is cancelled. This usually occurs due to the underlying Scheduler,
and is not something we do directly, other than when shutting down the scheduler.
Common usages of run()
There are various ways to run()
the
scheduler, notably how long it should run with timeout=
and also how
it should react to any exception that may have occurred within the Scheduler
itself or your callbacks.
Please see the run()
API doc for more
details and features, however we show two common use cases of using the timeout=
parameter.
You can render a live display using run(display=...)
.
This require rich
to be installed. You
can install this with pip install rich
or pip install amltk[rich]
.
You can tell the Scheduler
to stop after a certain amount of time
with the timeout=
argument to run()
.
This will also trigger the @timeout
event as seen in the Scheduler
output.
import time
from asyncio import Future
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def expensive_function() -> int:
time.sleep(0.1)
return 42
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function)
# This will endlessly loop the scheduler
@scheduler.on_future_done
def submit_again(future: Future) -> None:
if scheduler.running():
scheduler.submit(expensive_function)
scheduler.run(timeout=1) # End after 1 second
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout 1
@on_future_submitted 10
@on_future_done 10
└── def submit_again(future: _asyncio.Future) -> None (10)
@on_future_cancelled
@on_future_exception
@on_future_result 10
By specifying that the Scheduler
should not wait for ongoing tasks
to finish, the Scheduler
will attempt to cancel and possibly terminate
any running tasks.
import time
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def expensive_function() -> None:
time.sleep(10)
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function)
scheduler.run(timeout=1, wait=False) # End after 1 second
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout 1
@on_future_submitted 1
@on_future_done
@on_future_cancelled 1
@on_future_exception
@on_future_result
Forcibly Terminating Workers
As an Executor
does not provide an interface to forcibly
terminate workers, we provide Scheduler(terminate=...)
as a custom
strategy for cleaning up a provided executor. It is not possible
to terminate running thread based workers, for example using
ThreadPoolExecutor
and any Executor using threads to spawn
tasks will have to wait until all running tasks are finish
before python can close.
It's likely terminate
will trigger the EXCEPTION
event for
any tasks that are running during the shutdown, not*
a cancelled event. This is because we use a
Future
under the hood and these can not be cancelled once running.
However there is no guarantee of this and is up to how the
Executor
handles this.
Scheduling something to be run later
You can schedule some function to be run later using the
scheduler
.call_later() method.
Note
This does not run the function in the background, it just schedules some
function to be called later, where you could perhaps then use submit to
scheduler a Task
to run the function in the
background.