Plugins
Plugins#
Plugins are a way to modify a Task
, to add new functionality
or change the behaviour of what goes on in the function that is dispatched to the
Scheduler
.
Some plugins will also add new @event
s to a task, which can be used to respond accordingly to
something that may have occured with your task.
You can add a plugin to a Task
as so:
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins import Limiter
def some_function(x: int) -> int:
return x * 2
scheduler = Scheduler.with_processes(1)
# When creating a task with the scheduler
task = scheduler.task(some_function, plugins=[Limiter(max_calls=10)])
# or directly to a Task
task = Task(some_function, scheduler=scheduler, plugins=[Limiter(max_calls=10)])
╭─ Task some_function(x: int) -> int ──────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/10 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰────────────────────── Ref: Task-some_function-ZCzxQpJY ──────────────────────╯
Limiter#
amltk.scheduling.plugins.limiter
#
The Limiter
can limit the number of
times a function is called, how many concurrent instances of it can be running,
or whether it can run while another task is running.
The functionality of the Limiter
could also be implemented without a plugin but
it gives some nice utility.
Usage
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])
@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-dFE6Yw2e ────────────────────────────╯
@events
amltk.scheduling.plugins.Limiter.CALL_LIMIT_REACHED
class-attribute
instance-attribute
#
The event emitted when the task has reached its call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])
@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-l6391QGS ────────────────────────────╯
amltk.scheduling.plugins.Limiter.CONCURRENT_LIMIT_REACHED
class-attribute
instance-attribute
#
The event emitted when the task has reached its concurrent call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(2)
task = scheduler.task(fn, plugins=[Limiter(max_concurrent=2)])
@task.on("concurrent-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Concurrent 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-7d0TnlVN ────────────────────────────╯
amltk.scheduling.plugins.Limiter.DISABLED_DUE_TO_RUNNING_TASK
class-attribute
instance-attribute
#
The event emitter when the task was not submitted due to some other running task.
Will call any subscribers with the task as first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(2)
other_task = scheduler.task(fn)
task = scheduler.task(fn, plugins=[Limiter(not_while_running=other_task)])
@task.on("disabled-due-to-running-task")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Not While def fn(...) Ref: Task-fn-uxHmfub3 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-gS2x55te ────────────────────────────╯
Pynisher#
amltk.scheduling.plugins.pynisher
#
The PynisherPlugin
uses pynisher to place memory, walltime
and cputime constraints on processes, crashing them if these limits are reached.
These default units are bytes ("B")
and seconds ("s")
but you can also use other
units, please see the relevant API doc.
It's best use is when used with
Scheduler.with_processes()
to have work
performed in processes.
Requirements
This required pynisher
which can be installed with:
Usage
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-timeout")
def callback(exception):
pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │ Memory Wall Time CPU Time │ │
│ │ None (1, 's') None │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-wPjLi9hQ ────────────────────────────╯
@events
amltk.scheduling.plugins.pynisher.PynisherPlugin.TIMEOUT
class-attribute
instance-attribute
#
TIMEOUT: Event[[TimeoutException], Any] = Event(
"pynisher-timeout"
)
A Task timed out, either due to the wall time or cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-timeout")
def callback(exception):
pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │ Memory Wall Time CPU Time │ │
│ │ None (1, 's') None │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-hZ03FLTJ ────────────────────────────╯
amltk.scheduling.plugins.pynisher.PynisherPlugin.MEMORY_LIMIT_REACHED
class-attribute
instance-attribute
#
A Task was submitted but reached it's memory limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import numpy as np
def f(x: int) -> int:
x = np.arange(100000000)
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(memory_limit=(1, "KB")))
@task.on("pynisher-memory-limit")
def callback(exception):
pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │ Memory Wall Time CPU Time │ │
│ │ (1, 'KB') None None │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-WI0zTjUy ────────────────────────────╯
amltk.scheduling.plugins.pynisher.PynisherPlugin.CPU_TIME_LIMIT_REACHED
class-attribute
instance-attribute
#
A Task was submitted but reached it's cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
i = 0
while True:
# Keep busying computing the answer to everything
i += 1
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(cputime_limit=(1, "s")))
@task.on("pynisher-cputime-limit")
def callback(exception):
pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │ Memory Wall Time CPU Time │ │
│ │ None None (1, 's') │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-mbWR8gX9 ────────────────────────────╯
amltk.scheduling.plugins.pynisher.PynisherPlugin.WALL_TIME_LIMIT_REACHED
class-attribute
instance-attribute
#
A Task was submitted but reached it's wall time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-walltime-limit")
def callback(exception):
pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │ Memory Wall Time CPU Time │ │
│ │ None (1, 's') None │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-i8OWPrE1 ────────────────────────────╯
Scheduler Executor
This will place process limits on the task as soon as it starts
running, whever it may be running. If you are using
Scheduler.with_sequential()
then this will place limits on the main process, likely not what you
want. This also does not work with a
ThreadPoolExecutor
.
If using this with something like [dask-jobqueue
],
then this will place limits on the workers it spawns. It would be better
to place limits directly through dask job-queue then.
Platform Limitations (Mac, Windows)
Pynisher has some limitations with memory on Mac and Windows: automl/pynisher#features
You can check this with PynisherPlugin.supports("memory")
,
PynisherPlugin.supports("cpu_time")
and
PynisherPlugin.supports("wall_time")
.
See PynisherPlugin.supports()
Comm#
amltk.scheduling.plugins.comm
#
The Comm.Plugin
enables
two way-communication with running Task
.
The Comm
provides an easy interface to
communicate while the Comm.Msg
encapsulates
messages between the main process and the Task
.
Usage
To setup a Task
to work with a Comm
, the Task
must accept a comm
as
a keyword argument. This is to prevent it conflicting with any args passed
through during the call to submit()
.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def powers_of_two(start: int, n: int, *, comm: Comm) -> None:
with comm.open():
for i in range(n):
comm.send(start ** (i+1))
scheduler = Scheduler.with_processes(1)
task = scheduler.task(powers_of_two, plugins=Comm.Plugin())
results = []
@scheduler.on_start
def on_start():
task.submit(2, 5)
@task.on("comm-open")
def on_open(msg: Comm.Msg):
print(f"Task has opened | {msg}")
@task.on("comm-message")
def on_message(msg: Comm.Msg):
results.append(msg.data)
scheduler.run()
print(results)
You can also block a worker, waiting for a response from the main process, allowing for the
worker to request()
data from the main process.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def my_worker(comm: Comm, n_tasks: int) -> None:
with comm.open():
for task_number in range(n_tasks):
task = comm.request(task_number)
comm.send(f"Task recieved {task} for {task_number}")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(my_worker, plugins=Comm.Plugin())
items = ["A", "B", "C"]
results = []
@scheduler.on_start
def on_start():
task.submit(n_tasks=3)
@task.on("comm-request")
def on_request(msg: Comm.Msg):
task_number = msg.data
msg.respond(items[task_number])
@task.on("comm-message")
def on_message(msg: Comm.Msg):
results.append(msg.data)
scheduler.run()
print(results)
@events
amltk.scheduling.plugins.comm.Comm.MESSAGE
class-attribute
instance-attribute
#
A Task has sent a message to the main process.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(x: int, comm: Comm | None = None) -> int:
assert comm is not None
with comm.open():
comm.send(x + 1)
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@task.on("comm-message")
def callback(msg: Comm.Msg):
print(msg.data)
╭─ Task fn(x: int, comm: amltk.scheduling.plugins.comm.Comm | None = None) -> ─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-dRCPpRTk ────────────────────────────╯
amltk.scheduling.plugins.comm.Comm.REQUEST
class-attribute
instance-attribute
#
A Task has sent a request.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def greeter(greeting: str, comm: Comm | None = None) -> None:
assert comm is not None
with comm.open():
name = comm.request()
comm.send(f"{greeting} {name}!")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(greeter, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
task.submit("Hello")
@task.on("comm-request")
def on_request(msg: Comm.Msg):
msg.respond("Alice")
@task.on("comm-message")
def on_msg(msg: Comm.Msg):
print(msg.data)
scheduler.run()
╭─ Task greeter(greeting: str, comm: amltk.scheduling.plugins.comm.Comm | None─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────── Ref: Task-greeter-QEUC6xdm ─────────────────────────╯
amltk.scheduling.plugins.comm.Comm.OPEN
class-attribute
instance-attribute
#
The task has signalled it's open.
```python exec="true" source="material-block" html="true" hl_lines="5 15-17" from amltk.scheduling import Scheduler from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None: with comm.open(): pass from amltk._doc import make_picklable; make_picklable(fn) # markdown-exec: hide
scheduler = Scheduler.with_processes(1) task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start def on_start(): task.submit()
@task.on("comm-open") def callback(msg: Comm.Msg): print("Comm has just used comm.open()")
scheduler.run() from amltk._doc import doc_print; doc_print(print, task) # markdown-exec: hide ```
amltk.scheduling.plugins.comm.Comm.CLOSE
class-attribute
instance-attribute
#
The task has signalled it's close.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None:
with comm.open():
pass
# Will send a close signal to the main process as it exists this block
print("Done")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
task.submit()
@task.on("comm-close")
def on_close(msg: Comm.Msg):
print(f"Worker close with {msg}")
scheduler.run()
╭─ Task fn(comm: amltk.scheduling.plugins.comm.Comm) -> None ──────────────────╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-rV5lGSkk ────────────────────────────╯
Supported Backends
The current implementation relies on Pipe
which only
works between processes on the same system/cluster. There is also limited support
with dask
backends.
This could be extended to allow for web sockets or other forms of connections but requires time. Please let us know in the Github issues if this is something you are interested in!
ThreadPoolCTL#
amltk.scheduling.plugins.threadpoolctl
#
The
ThreadPoolCTLPlugin
if useful for parallel training of models. Without limiting with
threadpoolctl, the number of threads used by a given model may
oversubscribe to resources and cause significant slowdowns.
This is the mechanism employed by scikit-learn to limit the number of threads used by a given model.
See threadpoolctl documentation.
Requirements
This requires threadpoolctl
which can be installed with:
Usage
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins.threadpoolctl import ThreadPoolCTLPlugin
scheduler = Scheduler.with_processes(1)
def f() -> None:
# ... some task that respects the limits set by threadpoolctl
pass
task = scheduler.task(f, plugins=ThreadPoolCTLPlugin(max_threads=1))
╭─ Task f() -> None ───────────────────────────────────────────────────────────╮
│ ╭────────────────────── Plugin threadpoolctl-plugin ───────────────────────╮ │
│ │ Max Threads User-API │ │
│ │ 1 None │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-9WVsJgNn ────────────────────────────╯
Warning Filter#
amltk.scheduling.plugins.warning_filter
#
The
WarningFilter
if used to automatically filter out warnings from a Task
as it runs.
This wraps your function in context manager
warnings.catch_warnings()
and applies your arguments to warnings.filterwarnings()
,
as you would normally filter warnings in Python.
Usage
import warnings
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import WarningFilter
def f() -> None:
warnings.warn("This is a warning")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(f, plugins=WarningFilter("ignore"))
╭─ Task f() -> None ───────────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin warning-filter ──────────────────────────╮ │
│ │ Args Kwargs │ │
│ │ ('ignore',) {} │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-4ZryZEh6 ────────────────────────────╯
Creating Your Own Plugin#
amltk.scheduling.plugins.plugin
#
A plugin that can be attached to a Task.
By inheriting from a Plugin
, you can hook into a
Task
. A plugin can affect, modify and extend its
behaviours. Please see the documentation of the methods for more information.
Creating a plugin is only necesary if you need to modify actual behaviour of
the task. For siply hooking into the lifecycle of a task, you can use the @events
that a Task
emits.
Creating a Plugin
For a full example of a simple plugin, see the
Limiter
plugin which prevents
the task being submitted if for example, it has already been submitted
too many times.
The below example shows how to create a plugin that prints the task name before submitting it. It also emits an event when the task is submitted.
from __future__ import annotations
from typing import Callable
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Plugin
from amltk.scheduling.events import Event
# A simple plugin that prints the task name before submitting
class Printer(Plugin):
name = "my-plugin"
# Define an event the plugin will emit
# Event[Task] indicates the callback for the event will be called with the task
PRINTED: Event[str] = Event("printer-msg")
def __init__(self, greeting: str):
self.greeting = greeting
self.n_greetings = 0
def attach_task(self, task) -> None:
self.task = task
# Register an event with the task, this lets the task know valid events
# people can subscribe to and helps it show up in visuals
task.add_event(self.PRINTED)
task.on_submitted(self._print_submitted, hidden=True) # You can hide this callback from visuals
def pre_submit(self, fn, *args, **kwargs) -> tuple[Callable, tuple, dict]:
print(f"{self.greeting} for {self.task} {args} {kwargs}")
self.n_greetings += 1
return fn, args, kwargs
def _print_submitted(self, future, *args, **kwargs) -> None:
msg = f"Task was submitted {self.task} {args} {kwargs}"
self.task.emit(self.PRINTED, msg) # Emit the event with a msg
def __rich__(self):
# Custome how the plugin is displayed in rich (Optional)
# rich is an optional dependancy of amltk so we move the imports into here
from rich.panel import Panel
return Panel(
f"Greeting: {self.greeting} ({self.n_greetings})",
title=f"Plugin {self.name}"
)
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Printer("Hello")])
@scheduler.on_start
def on_start():
task.submit(15)
@task.on("printer-msg")
def callback(msg: str):
print("\nmsg")
scheduler.run()
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭──────────────────────────── Plugin my-plugin ────────────────────────────╮ │
│ │ Greeting: Hello (1) │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-TyEI9BtB ────────────────────────────╯
All methods are optional, and you can choose to implement only the ones
you need. Most plugins will likely need to implement the
attach_task()
method, which is called
when the plugin is attached to a task. In this method, you can for
example subscribe to events on the task, create new subscribers for people
to use or even store a reference to the task for later use.
Plugins are also encouraged to utilize the events of a
Task
to further hook into the lifecycle of the task.
For exampe, by saving a reference to the task in the attach_task()
method, you
can use the emit()
method of the task to emit
your own specialized events.