Scheduler
The Scheduler
uses
an Executor
, a builtin python native with
a submit(f, *args, **kwargs)
function to submit compute to
be compute else where, whether it be locally or remotely.
The Scheduler
is primarily used to dispatch compute to an Executor
and
emit @events
, which can trigger user callbacks.
Typically you should not use the Scheduler
directly for dispatching and
responding to computed functions, but rather use a Task
Running in a Jupyter Notebook/Colab
If you are using a Jupyter Notebook, you likley need to use the following at the top of your notebook:
This is due to the fact a notebook runs in an async context. If you do not wish to use the above snippet, you can instead use:
Basic Usage
In this example, we create a scheduler that uses local processes as
workers. We then create a task that will run a function fn
and submit it
to the scheduler. Lastly, a callback is registered to @on_future_result
to print the
result when the compute is done.
from amltk.scheduling import Scheduler
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
@scheduler.on_start
def launch_the_compute():
scheduler.submit(fn, 1)
@scheduler.on_future_result
def callback(future, result):
print(f"Result: {result}")
scheduler.run()
Result: 2
The last line in the previous example called
scheduler.run()
is what starts the scheduler
running, in which it will first emit the @on_start
event. This triggered the
callback launch_the_compute()
which submitted the function fn
with the
arguments 1
.
The scheduler then ran the compute and waited for it to complete, emitting the
@on_future_result
event when it was done successfully. This triggered the callback
callback()
which printed the result.
At this point, there is no more compute happening and no more events to respond to so the scheduler will halt.
@events
When the scheduler enters some important state, it will emit an event to let you know.
A Subscriber
which is called when the
scheduler starts. This is the first event emitted by the scheduler and
one of the only ways to submit the initial compute to the scheduler.
A Subscriber
which is called when the
scheduler is finishing up. This occurs right before the scheduler shuts down
the executor.
A Subscriber
which is called when
the scheduler is finished, has shutdown the executor and possibly
terminated any remaining compute.
A Subscriber
which is called when the
scheduler is has been stopped due to the stop()
method being called.
A Subscriber
which is called when
the scheduler reaches the timeout.
A Subscriber
which is called when the
queue is empty. This can be useful to re-fill the queue and prevent the
scheduler from exiting.
When any compute goes through the Scheduler
, it will emit an event
to let you know. You should however prefer to use a
Task
as it will emit specific events
for the task at hand, and not all compute.
A Subscriber
which is called when
some compute is submitted.
A Subscriber
which is called when
a future returned with a result, no exception raise.
A Subscriber
which is called when
some compute raised an uncaught exception.
A Subscriber
which is called when
some compute is done, regardless of whether it was successful or not.
A Subscriber
which is called
when a future is cancelled. This usually occurs due to the underlying Scheduler,
and is not something we do directly, other than when shutting down the scheduler.
Common usages of run()
There are various ways to run()
the
scheduler, notably how long it should run with timeout=
and also how
it should react to any exception that may have occurred within the Scheduler
itself or your callbacks.
Please see the run()
API doc for more
details and features, however we show two common use cases of using the timeout=
parameter.
You can render a live display using run(display=...)
.
This require rich
to be installed. You
can install this with pip install rich
or pip install amltk[rich]
.
You can tell the Scheduler
to stop after a certain amount of time
with the timeout=
argument to run()
.
This will also trigger the @on_timeout
event as seen in the Scheduler
output.
import time
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def expensive_function() -> int:
time.sleep(0.1)
return 42
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function)
# The will endlessly loop the scheduler
@scheduler.on_future_done
def submit_again(future: Future) -> None:
if scheduler.running():
scheduler.submit(expensive_function)
scheduler.run(timeout=1) # End after 1 second
By specifying that the Scheduler
should not wait for ongoing tasks
to finish, the Scheduler
will attempt to cancel and possibly terminate
any running tasks.
import time
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def expensive_function() -> None:
time.sleep(10)
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function)
scheduler.run(timeout=1, wait=False) # End after 1 second
Forcibly Terminating Workers
As an Executor
does not provide an interface to forcibly
terminate workers, we provide Scheduler(terminate=...)
as a custom
strategy for cleaning up a provided executor. It is not possible
to terminate running thread based workers, for example using
ThreadPoolExecutor
and any Executor using threads to spawn
tasks will have to wait until all running tasks are finish
before python can close.
It's likely terminate
will trigger the EXCEPTION
event for
any tasks that are running during the shutdown, not*
a cancelled event. This is because we use a
Future
under the hood and these can not be cancelled once running.
However there is no guarantee of this and is up to how the
Executor
handles this.
Scheduling something to be run later
You can schedule some function to be run later using the
scheduler
.call_later() method.
Note
This does not run the function in the background, it just schedules some
function to be called later, where you could perhaps then use submit to
scheduler a Task
to run the function in the
background.
class Scheduler(executor, *, terminate=True)
#
Bases: RichRenderable
A scheduler for submitting tasks to an Executor.
PARAMETER | DESCRIPTION |
---|---|
executor |
The dispatcher to use for submitting tasks.
TYPE:
|
terminate |
Whether to call shutdown on the executor when
|
Source code in src/amltk/scheduling/scheduler.py
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
|
executor: Executor
attr
#
The executor to use to run tasks.
emitter: Emitter
attr
#
The emitter to use for events.
queue: dict[Future, tuple[Callable, tuple, dict]]
attr
#
The queue of tasks running.
on_start: Subscriber[[]]
attr
#
A Subscriber
which is called when the
scheduler starts. This is the first event emitted by the scheduler and
one of the only ways to submit the initial compute to the scheduler.
on_finishing: Subscriber[[]]
attr
#
A Subscriber
which is called when the
scheduler is finishing up. This occurs right before the scheduler shuts down
the executor.
on_finished: Subscriber[[]]
attr
#
A Subscriber
which is called when
the scheduler is finished, has shutdown the executor and possibly
terminated any remaining compute.
on_stop: Subscriber[str, BaseException | None]
attr
#
A Subscriber
which is called when the
scheduler is has been stopped due to the stop()
method being called.
on_timeout: Subscriber[[]]
attr
#
A Subscriber
which is called when
the scheduler reaches the timeout.
on_empty: Subscriber[[]]
attr
#
A Subscriber
which is called when the
queue is empty. This can be useful to re-fill the queue and prevent the
scheduler from exiting.
on_future_submitted: Subscriber[Future]
attr
#
A Subscriber
which is called when
some compute is submitted.
on_future_done: Subscriber[Future]
attr
#
A Subscriber
which is called when
some compute is done, regardless of whether it was successful or not.
on_future_cancelled: Subscriber[Future]
attr
#
A Subscriber
which is called
when a future is cancelled. This usually occurs due to the underlying Scheduler,
and is not something we do directly, other than when shutting down the scheduler.
on_future_exception: Subscriber[Future, BaseException]
attr
#
A Subscriber
which is called when
some compute raised an uncaught exception.
on_future_result: Subscriber[Future, Any]
attr
#
A Subscriber
which is called when
a future returned with a result, no exception raise.
def with_processes(max_workers=None, mp_context=None, initializer=None, initargs=())
classmethod
#
Create a scheduler with a ProcessPoolExecutor
.
See ProcessPoolExecutor
for more details.
Source code in src/amltk/scheduling/scheduler.py
def with_loky(max_workers=None, context=None, timeout=10, kill_workers=False, reuse='auto', job_reducers=None, result_reducers=None, initializer=None, initargs=(), env=None)
classmethod
#
Create a scheduler with a loky.get_reusable_executor
.
See [loky documentation][loky.readthedocs.io/en/stable/API.html] for more details.
Source code in src/amltk/scheduling/scheduler.py
def with_sequential()
classmethod
#
Create a Scheduler that runs sequentially.
This is useful for debugging and testing. Uses
a SequentialExecutor
.
Source code in src/amltk/scheduling/scheduler.py
def with_slurm(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler that runs on a SLURM cluster.
This is useful for running on a SLURM cluster. Uses dask_jobqueue.SLURMCluster.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers to start.
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A scheduler that will run on a SLURM cluster. |
Source code in src/amltk/scheduling/scheduler.py
def with_pbs(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler that runs on a PBS cluster.
This is useful for running on a PBS cluster. Uses dask_jobqueue.PBSCluster.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers to start.
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A scheduler that will run on a PBS cluster. |
Source code in src/amltk/scheduling/scheduler.py
def with_sge(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler that runs on a SGE cluster.
This is useful for running on a SGE cluster. Uses dask_jobqueue.SGECluster.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers to start.
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A scheduler that will run on a SGE cluster. |
Source code in src/amltk/scheduling/scheduler.py
def with_oar(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler that runs on a OAR cluster.
This is useful for running on a OAR cluster. Uses dask_jobqueue.OARCluster.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers to start.
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A scheduler that will run on a OAR cluster. |
Source code in src/amltk/scheduling/scheduler.py
def with_moab(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler that runs on a Moab cluster.
This is useful for running on a Moab cluster. Uses dask_jobqueue.MoabCluster.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers to start.
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A scheduler that will run on a Moab cluster. |
Source code in src/amltk/scheduling/scheduler.py
def with_lsf(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler that runs on a LSF cluster.
This is useful for running on a LSF cluster. Uses dask_jobqueue.LSFCluster.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers to start.
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A scheduler that will run on a LSF cluster. |
Source code in src/amltk/scheduling/scheduler.py
def with_htcondor(*, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler that runs on a HTCondor cluster.
This is useful for running on a HTCondor cluster. Uses dask_jobqueue.HTCondorCluster.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers to start.
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Self
|
A scheduler that will run on a HTCondor cluster. |
Source code in src/amltk/scheduling/scheduler.py
def with_dask_jobqueue(name, *, n_workers, adaptive=False, submit_command=None, cancel_command=None, **kwargs)
classmethod
#
Create a Scheduler with using dask-jobqueue
.
See dask_jobqueue
for more details.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the jobqueue to use. This is the name of the
class in
TYPE:
|
adaptive |
Whether to use the adaptive scaling of the cluster or fixed allocate all workers. This will specifically use the dask_jobqueue.SLURMCluster.adapt method to dynamically scale the cluster to the number of workers specified.
TYPE:
|
n_workers |
The number of workers to start.
TYPE:
|
submit_command |
Overwrite the command to submit a worker if necessary.
TYPE:
|
cancel_command |
Overwrite the command to cancel a worker if necessary.
TYPE:
|
kwargs |
Any additional keyword arguments to pass to the
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ImportError
|
If |
RETURNS | DESCRIPTION |
---|---|
Self
|
A new scheduler with a |
Source code in src/amltk/scheduling/scheduler.py
def empty()
#
Check if the scheduler is empty.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if there are no tasks in the queue. |
def running()
#
Whether the scheduler is running and accepting tasks to dispatch.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the scheduler is running and accepting tasks. |
def submit(fn, /, *args, **kwargs)
#
Submits a callable to be executed with the given arguments.
PARAMETER | DESCRIPTION |
---|---|
fn |
The callable to be executed as fn(args, *kwargs) that returns a Future instance representing the execution of the callable.
TYPE:
|
args |
positional arguments to pass to the function
TYPE:
|
kwargs |
keyword arguments to pass to the function
TYPE:
|
RAISES | DESCRIPTION |
---|---|
SchedulerNotRunningError
|
If the scheduler is not running.
You can protect against this using,
|
RETURNS | DESCRIPTION |
---|---|
Future[R]
|
A Future representing the given call. |
Source code in src/amltk/scheduling/scheduler.py
def task(function, *, plugins=(), init_plugins=True)
#
Create a new task.
PARAMETER | DESCRIPTION |
---|---|
function |
The function to run using the scheduler.
TYPE:
|
plugins |
The plugins to attach to the task. |
init_plugins |
Whether to initialize the plugins.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Task[P, R]
|
A new task. |
Source code in src/amltk/scheduling/scheduler.py
def run(*, timeout=None, end_on_empty=True, wait=True, on_exception='raise', asyncio_debug_mode=False, display=False)
#
Run the scheduler.
PARAMETER | DESCRIPTION |
---|---|
timeout |
The maximum time to run the scheduler for in
seconds. Defaults to
TYPE:
|
end_on_empty |
Whether to end the scheduler when the queue becomes empty.
TYPE:
|
wait |
Whether to wait for currently running compute to finish once
the
TYPE:
|
on_exception |
What to do when an exception occurs in the scheduler or callbacks (Does not apply to submitted compute!)
TYPE:
|
asyncio_debug_mode |
Whether to run the async loop in debug mode.
Defaults to
TYPE:
|
display |
Whether to display the scheduler live in the console.
|
RETURNS | DESCRIPTION |
---|---|
ExitState
|
The reason for the scheduler ending. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If the scheduler is already running. |
Source code in src/amltk/scheduling/scheduler.py
def async_run(*, timeout=None, end_on_empty=True, wait=True, on_exception='raise', display=False)
async
#
Async version of run
.
This can be useful if you are already running in an async context, such as in a web server or Jupyter notebook.
Please see run()
for more details.
Source code in src/amltk/scheduling/scheduler.py
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 |
|
def stop(*args, stop_msg=None, exception=None, **kwargs)
#
Stop the scheduler.
The scheduler will stop, finishing currently running tasks depending
on the wait=
parameter to Scheduler.run
.
The call signature is kept open with *args, **kwargs
to make it
easier to include in any callback.
PARAMETER | DESCRIPTION |
---|---|
*args |
Logged in a debug message
TYPE:
|
**kwargs |
Logged in a debug message
TYPE:
|
stop_msg |
The message to log when stopping the scheduler.
TYPE:
|
exception |
The exception which incited
TYPE:
|
Source code in src/amltk/scheduling/scheduler.py
def call_later(delay, fn, *args, **kwargs)
#
Schedule a function to be run after a delay.
PARAMETER | DESCRIPTION |
---|---|
delay |
The delay in seconds.
TYPE:
|
fn |
The function to run. |
args |
The positional arguments to pass to the function.
TYPE:
|
kwargs |
The keyword arguments to pass to the function.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
TimerHandle
|
A timer handle that can be used to cancel the function. |
Source code in src/amltk/scheduling/scheduler.py
def add_renderable(renderable)
#
Add a renderable object to the scheduler.
This will be displayed whenever the scheduler is displayed.
class ExitState
dataclass
#
The exit state of a scheduler.
ATTRIBUTE | DESCRIPTION |
---|---|
reason |
The reason for the exit.
|
exception |
The exception that caused the exit, if any.
TYPE:
|
class Code
#
Bases: Enum
The reason the scheduler ended.
STOPPED
classvar
attr
#
The scheduler was stopped forcefully with Scheduler.stop
.
TIMEOUT
classvar
attr
#
The scheduler finished because of a timeout.
EXHAUSTED
classvar
attr
#
The scheduler finished because it exhausted its queue.
CANCELLED
classvar
attr
#
The scheduler was cancelled.
UNKNOWN
classvar
attr
#
The scheduler finished for an unknown reason.
EXCEPTION
classvar
attr
#
The scheduler finished because of an exception.