Plugins
Plugins#
Plugins are a way to modify a Task, to add new functionality
or change the behaviour of what goes on in the function that is dispatched to the
Scheduler.
Some plugins will also add new @events to a task, which can be used to respond accordingly to
something that may have occured with your task.
You can add a plugin to a Task as so:
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins import Limiter
def some_function(x: int) -> int:
    return x * 2
scheduler = Scheduler.with_processes(1)
# When creating a task with the scheduler
task = scheduler.task(some_function, plugins=[Limiter(max_calls=10)])
# or directly to a Task
task = Task(some_function, scheduler=scheduler, plugins=[Limiter(max_calls=10)])
╭─ Task some_function(x: int) -> int ──────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/10                                                               │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰────────────────────── Ref: Task-some_function-ZCzxQpJY ──────────────────────╯
Limiter#
    The Limiter can limit the number of
times a function is called, how many concurrent instances of it can be running,
or whether it can run while another task is running.
The functionality of the Limiter could also be implemented without a plugin but
it gives some nice utility.
Usage
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
    return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])
@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/2                                                                │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-dFE6Yw2e ────────────────────────────╯
@events
class-attribute
      instance-attribute
  
#
    The event emitted when the task has reached its call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
    return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])
@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/2                                                                │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-l6391QGS ────────────────────────────╯
class-attribute
      instance-attribute
  
#
    The event emitted when the task has reached its concurrent call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
    return x + 1
scheduler = Scheduler.with_processes(2)
task = scheduler.task(fn, plugins=[Limiter(max_concurrent=2)])
@task.on("concurrent-limit-reached")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Concurrent 0/2                                                           │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-7d0TnlVN ────────────────────────────╯
class-attribute
      instance-attribute
  
#
    The event emitter when the task was not submitted due to some other running task.
Will call any subscribers with the task as first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
    return x + 1
scheduler = Scheduler.with_processes(2)
other_task = scheduler.task(fn)
task = scheduler.task(fn, plugins=[Limiter(not_while_running=other_task)])
@task.on("disabled-due-to-running-task")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Not While def fn(...) Ref: Task-fn-uxHmfub3                              │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-gS2x55te ────────────────────────────╯
Pynisher#
    The PynisherPlugin
uses pynisher to place memory, walltime
and cputime constraints on processes, crashing them if these limits are reached.
These default units are bytes ("B") and seconds ("s") but you can also use other
units, please see the relevant API doc.
It's best use is when used with
Scheduler.with_processes() to have work
performed in processes.
Requirements
This required pynisher which can be installed with:
Usage
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
    time.sleep(x)
    return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-timeout")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │  Memory  Wall Time  CPU Time                                             │ │
│ │  None    (1, 's')   None                                                 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-wPjLi9hQ ────────────────────────────╯
@events
class-attribute
      instance-attribute
  
#
TIMEOUT: Event[[TimeoutException], Any] = Event(
    "pynisher-timeout"
)
A Task timed out, either due to the wall time or cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
    time.sleep(x)
    return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-timeout")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │  Memory  Wall Time  CPU Time                                             │ │
│ │  None    (1, 's')   None                                                 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-hZ03FLTJ ────────────────────────────╯
class-attribute
      instance-attribute
  
#
    A Task was submitted but reached it's memory limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import numpy as np
def f(x: int) -> int:
    x = np.arange(100000000)
    time.sleep(x)
    return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(memory_limit=(1, "KB")))
@task.on("pynisher-memory-limit")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │  Memory     Wall Time  CPU Time                                          │ │
│ │  (1, 'KB')  None       None                                              │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-WI0zTjUy ────────────────────────────╯
class-attribute
      instance-attribute
  
#
    A Task was submitted but reached it's cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
    i = 0
    while True:
        # Keep busying computing the answer to everything
        i += 1
    return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(cputime_limit=(1, "s")))
@task.on("pynisher-cputime-limit")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │  Memory  Wall Time  CPU Time                                             │ │
│ │  None    None       (1, 's')                                             │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-mbWR8gX9 ────────────────────────────╯
class-attribute
      instance-attribute
  
#
    A Task was submitted but reached it's wall time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
    time.sleep(x)
    return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-walltime-limit")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ │
│ │  Memory  Wall Time  CPU Time                                             │ │
│ │  None    (1, 's')   None                                                 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-i8OWPrE1 ────────────────────────────╯
Scheduler Executor
This will place process limits on the task as soon as it starts
running, whever it may be running. If you are using
Scheduler.with_sequential()
then this will place limits on the main process, likely not what you
want. This also does not work with a
ThreadPoolExecutor.
If using this with something like [dask-jobqueue],
then this will place limits on the workers it spawns. It would be better
to place limits directly through dask job-queue then.
Platform Limitations (Mac, Windows)
Pynisher has some limitations with memory on Mac and Windows: automl/pynisher#features
You can check this with PynisherPlugin.supports("memory"),
PynisherPlugin.supports("cpu_time") and
PynisherPlugin.supports("wall_time").
See PynisherPlugin.supports()
Comm#
    The Comm.Plugin enables
two way-communication with running Task.
The Comm provides an easy interface to
communicate while the Comm.Msg encapsulates
messages between the main process and the Task.
Usage
To setup a Task to work with a Comm, the Task must accept a comm as
a keyword argument. This is to prevent it conflicting with any args passed
through during the call to submit().
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def powers_of_two(start: int, n: int, *, comm: Comm) -> None:
    with comm.open():
        for i in range(n):
            comm.send(start ** (i+1))
scheduler = Scheduler.with_processes(1)
task = scheduler.task(powers_of_two, plugins=Comm.Plugin())
results = []
@scheduler.on_start
def on_start():
    task.submit(2, 5)
@task.on("comm-open")
def on_open(msg: Comm.Msg):
    print(f"Task has opened | {msg}")
@task.on("comm-message")
def on_message(msg: Comm.Msg):
    results.append(msg.data)
scheduler.run()
print(results)
You can also block a worker, waiting for a response from the main process, allowing for the
worker to request() data from the main process.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def my_worker(comm: Comm, n_tasks: int) -> None:
    with comm.open():
        for task_number in range(n_tasks):
            task = comm.request(task_number)
            comm.send(f"Task recieved {task} for {task_number}")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(my_worker, plugins=Comm.Plugin())
items = ["A", "B", "C"]
results = []
@scheduler.on_start
def on_start():
    task.submit(n_tasks=3)
@task.on("comm-request")
def on_request(msg: Comm.Msg):
    task_number = msg.data
    msg.respond(items[task_number])
@task.on("comm-message")
def on_message(msg: Comm.Msg):
    results.append(msg.data)
scheduler.run()
print(results)
@events
class-attribute
      instance-attribute
  
#
    A Task has sent a message to the main process.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(x: int, comm: Comm | None = None) -> int:
    assert comm is not None
    with comm.open():
        comm.send(x + 1)
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@task.on("comm-message")
def callback(msg: Comm.Msg):
    print(msg.data)
╭─ Task fn(x: int, comm: amltk.scheduling.plugins.comm.Comm | None = None) -> ─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0                                                      │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-dRCPpRTk ────────────────────────────╯
class-attribute
      instance-attribute
  
#
    A Task has sent a request.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def greeter(greeting: str, comm: Comm | None = None) -> None:
    assert comm is not None
    with comm.open():
        name = comm.request()
        comm.send(f"{greeting} {name}!")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(greeter, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
    task.submit("Hello")
@task.on("comm-request")
def on_request(msg: Comm.Msg):
    msg.respond("Alice")
@task.on("comm-message")
def on_msg(msg: Comm.Msg):
    print(msg.data)
scheduler.run()
╭─ Task greeter(greeting: str, comm: amltk.scheduling.plugins.comm.Comm | None─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0                                                      │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────── Ref: Task-greeter-QEUC6xdm ─────────────────────────╯
class-attribute
      instance-attribute
  
#
    The task has signalled it's open.
```python exec="true" source="material-block" html="true" hl_lines="5 15-17" from amltk.scheduling import Scheduler from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None: with comm.open(): pass from amltk._doc import make_picklable; make_picklable(fn) # markdown-exec: hide
scheduler = Scheduler.with_processes(1) task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start def on_start(): task.submit()
@task.on("comm-open") def callback(msg: Comm.Msg): print("Comm has just used comm.open()")
scheduler.run() from amltk._doc import doc_print; doc_print(print, task) # markdown-exec: hide ```
class-attribute
      instance-attribute
  
#
    The task has signalled it's close.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None:
    with comm.open():
        pass
        # Will send a close signal to the main process as it exists this block
    print("Done")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
    task.submit()
@task.on("comm-close")
def on_close(msg: Comm.Msg):
    print(f"Worker close with {msg}")
scheduler.run()
╭─ Task fn(comm: amltk.scheduling.plugins.comm.Comm) -> None ──────────────────╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0                                                      │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-rV5lGSkk ────────────────────────────╯
Supported Backends
The current implementation relies on Pipe which only
works between processes on the same system/cluster. There is also limited support
with dask backends.
This could be extended to allow for web sockets or other forms of connections but requires time. Please let us know in the Github issues if this is something you are interested in!
ThreadPoolCTL#
    The
ThreadPoolCTLPlugin
if useful for parallel training of models. Without limiting with
threadpoolctl, the number of threads used by a given model may
oversubscribe to resources and cause significant slowdowns.
This is the mechanism employed by scikit-learn to limit the number of threads used by a given model.
See threadpoolctl documentation.
Requirements
This requires threadpoolctl which can be installed with:
Usage
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins.threadpoolctl import ThreadPoolCTLPlugin
scheduler = Scheduler.with_processes(1)
def f() -> None:
    # ... some task that respects the limits set by threadpoolctl
    pass
task = scheduler.task(f, plugins=ThreadPoolCTLPlugin(max_threads=1))
╭─ Task f() -> None ───────────────────────────────────────────────────────────╮
│ ╭────────────────────── Plugin threadpoolctl-plugin ───────────────────────╮ │
│ │  Max Threads  User-API                                                   │ │
│ │  1            None                                                       │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-9WVsJgNn ────────────────────────────╯
Warning Filter#
    The
WarningFilter
if used to automatically filter out warnings from a Task
as it runs.
This wraps your function in context manager
warnings.catch_warnings()
and applies your arguments to warnings.filterwarnings(),
as you would normally filter warnings in Python.
Usage
import warnings
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import WarningFilter
def f() -> None:
    warnings.warn("This is a warning")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(f, plugins=WarningFilter("ignore"))
╭─ Task f() -> None ───────────────────────────────────────────────────────────╮
│ ╭───────────────────────── Plugin warning-filter ──────────────────────────╮ │
│ │  Args         Kwargs                                                     │ │
│ │  ('ignore',)  {}                                                         │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────── Ref: Task-f-4ZryZEh6 ────────────────────────────╯
Creating Your Own Plugin#
    A plugin that can be attached to a Task.
By inheriting from a Plugin, you can hook into a
Task. A plugin can affect, modify and extend its
behaviours. Please see the documentation of the methods for more information.
Creating a plugin is only necesary if you need to modify actual behaviour of
the task. For siply hooking into the lifecycle of a task, you can use the @events
that a Task emits.
Creating a Plugin
For a full example of a simple plugin, see the
Limiter plugin which prevents
the task being submitted if for example, it has already been submitted
too many times.
The below example shows how to create a plugin that prints the task name before submitting it. It also emits an event when the task is submitted.
from __future__ import annotations
from typing import Callable
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Plugin
from amltk.scheduling.events import Event
# A simple plugin that prints the task name before submitting
class Printer(Plugin):
    name = "my-plugin"
    # Define an event the plugin will emit
    # Event[Task] indicates the callback for the event will be called with the task
    PRINTED: Event[str] = Event("printer-msg")
    def __init__(self, greeting: str):
        self.greeting = greeting
        self.n_greetings = 0
    def attach_task(self, task) -> None:
        self.task = task
        # Register an event with the task, this lets the task know valid events
        # people can subscribe to and helps it show up in visuals
        task.add_event(self.PRINTED)
        task.on_submitted(self._print_submitted, hidden=True)  # You can hide this callback from visuals
    def pre_submit(self, fn, *args, **kwargs) -> tuple[Callable, tuple, dict]:
        print(f"{self.greeting} for {self.task} {args} {kwargs}")
        self.n_greetings += 1
        return fn, args, kwargs
    def _print_submitted(self, future, *args, **kwargs) -> None:
        msg = f"Task was submitted {self.task} {args} {kwargs}"
        self.task.emit(self.PRINTED, msg)  # Emit the event with a msg
    def __rich__(self):
        # Custome how the plugin is displayed in rich (Optional)
        # rich is an optional dependancy of amltk so we move the imports into here
        from rich.panel import Panel
        return Panel(
            f"Greeting: {self.greeting} ({self.n_greetings})",
            title=f"Plugin {self.name}"
        )
def fn(x: int) -> int:
    return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Printer("Hello")])
@scheduler.on_start
def on_start():
    task.submit(15)
@task.on("printer-msg")
def callback(msg: str):
    print("\nmsg")
scheduler.run()
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭──────────────────────────── Plugin my-plugin ────────────────────────────╮ │
│ │ Greeting: Hello (1)                                                      │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-TyEI9BtB ────────────────────────────╯
All methods are optional, and you can choose to implement only the ones
you need. Most plugins will likely need to implement the
attach_task() method, which is called
when the plugin is attached to a task. In this method, you can for
example subscribe to events on the task, create new subscribers for people
to use or even store a reference to the task for later use.
Plugins are also encouraged to utilize the events of a
Task to further hook into the lifecycle of the task.
For exampe, by saving a reference to the task in the attach_task() method, you
can use the emit() method of the task to emit
your own specialized events.