Skip to content

Plugins

Plugins#

Plugins are a way to modify a Task, to add new functionality or change the behaviour of what goes on in the function that is dispatched to the Scheduler.

Some plugins will also add new @events to a task, which can be used to respond accordingly to something that may have occured with your task.

You can add a plugin to a Task as so:

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins import Limiter

def some_function(x: int) -> int:
    return x * 2

scheduler = Scheduler.with_processes(1)

# When creating a task with the scheduler
task = scheduler.task(some_function, plugins=[Limiter(max_calls=10)])


# or directly to a Task
task = Task(some_function, scheduler=scheduler, plugins=[Limiter(max_calls=10)])
╭─ Task some_function(x: int) -> int ──────────────────────────────────────────╮
 ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ 
 │ Calls 0/10                                                               │ 
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰────────────────────── Ref: Task-some_function-ZCzxQpJY ──────────────────────╯

Limiter#

amltk.scheduling.plugins.limiter #

The Limiter can limit the number of times a function is called, how many concurrent instances of it can be running, or whether it can run while another task is running.

The functionality of the Limiter could also be implemented without a plugin but it gives some nice utility.

Usage
from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter

def fn(x: int) -> int:
    return x + 1

scheduler = Scheduler.with_processes(1)

task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])

@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
 ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ 
 │ Calls 0/2                                                                │ 
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-dFE6Yw2e ────────────────────────────╯

@events

amltk.scheduling.plugins.Limiter.CALL_LIMIT_REACHED class-attribute instance-attribute #

CALL_LIMIT_REACHED: Event[..., Any] = Event(
    "call-limit-reached"
)

The event emitted when the task has reached its call limit.

Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.

from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter

def fn(x: int) -> int:
    return x + 1

scheduler = Scheduler.with_processes(1)

task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])

@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
 ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ 
 │ Calls 0/2                                                                │ 
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-l6391QGS ────────────────────────────╯

amltk.scheduling.plugins.Limiter.CONCURRENT_LIMIT_REACHED class-attribute instance-attribute #

CONCURRENT_LIMIT_REACHED: Event[..., Any] = Event(
    "concurrent-limit-reached"
)

The event emitted when the task has reached its concurrent call limit.

Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.

from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter

def fn(x: int) -> int:
    return x + 1

scheduler = Scheduler.with_processes(2)

task = scheduler.task(fn, plugins=[Limiter(max_concurrent=2)])

@task.on("concurrent-limit-reached")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
 ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ 
 │ Concurrent 0/2                                                           │ 
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-7d0TnlVN ────────────────────────────╯

amltk.scheduling.plugins.Limiter.DISABLED_DUE_TO_RUNNING_TASK class-attribute instance-attribute #

DISABLED_DUE_TO_RUNNING_TASK: Event[..., Any] = Event(
    "disabled-due-to-running-task"
)

The event emitter when the task was not submitted due to some other running task.

Will call any subscribers with the task as first argument, followed by the arguments and keyword arguments that were passed to the task.

from amltk.scheduling import Scheduler, Task
from amltk.scheduling.plugins import Limiter

def fn(x: int) -> int:
    return x + 1

scheduler = Scheduler.with_processes(2)

other_task = scheduler.task(fn)
task = scheduler.task(fn, plugins=[Limiter(not_while_running=other_task)])

@task.on("disabled-due-to-running-task")
def callback(task: Task, *args, **kwargs):
    pass
╭─ Task fn(x: int) -> int ─────────────────────────────────────────────────────╮
 ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ 
 │ Not While def fn(...) Ref: Task-fn-uxHmfub3
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-gS2x55te ────────────────────────────╯

Pynisher#

amltk.scheduling.plugins.pynisher #

The PynisherPlugin uses pynisher to place memory, walltime and cputime constraints on processes, crashing them if these limits are reached. These default units are bytes ("B") and seconds ("s") but you can also use other units, please see the relevant API doc.

It's best use is when used with Scheduler.with_processes() to have work performed in processes.

Requirements

This required pynisher which can be installed with:

pip install amltk[pynisher]

# Or directly
pip install pynisher
Usage
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-timeout")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
 ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ 
 Memory  Wall Time  CPU Time 
None    (1, 's')   None
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰──────────────────────────── Ref: Task-f-wPjLi9hQ ────────────────────────────╯

@events

amltk.scheduling.plugins.pynisher.PynisherPlugin.TIMEOUT class-attribute instance-attribute #

TIMEOUT: Event[[TimeoutException], Any] = Event(
    "pynisher-timeout"
)

A Task timed out, either due to the wall time or cpu time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-timeout")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
 ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ 
 Memory  Wall Time  CPU Time 
None    (1, 's')   None
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰──────────────────────────── Ref: Task-f-hZ03FLTJ ────────────────────────────╯

amltk.scheduling.plugins.pynisher.PynisherPlugin.MEMORY_LIMIT_REACHED class-attribute instance-attribute #

MEMORY_LIMIT_REACHED: Event[[MemoryLimitException], Any] = (
    Event("pynisher-memory-limit")
)

A Task was submitted but reached it's memory limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import numpy as np

def f(x: int) -> int:
    x = np.arange(100000000)
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(memory_limit=(1, "KB")))

@task.on("pynisher-memory-limit")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
 ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ 
 Memory     Wall Time  CPU Time 
(1, 'KB')  None       None
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰──────────────────────────── Ref: Task-f-WI0zTjUy ────────────────────────────╯

amltk.scheduling.plugins.pynisher.PynisherPlugin.CPU_TIME_LIMIT_REACHED class-attribute instance-attribute #

CPU_TIME_LIMIT_REACHED: Event[
    [CpuTimeoutException], Any
] = Event("pynisher-cputime-limit")

A Task was submitted but reached it's cpu time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    i = 0
    while True:
        # Keep busying computing the answer to everything
        i += 1

    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(cputime_limit=(1, "s")))

@task.on("pynisher-cputime-limit")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
 ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ 
 Memory  Wall Time  CPU Time 
None    None       (1, 's')
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰──────────────────────────── Ref: Task-f-mbWR8gX9 ────────────────────────────╯

amltk.scheduling.plugins.pynisher.PynisherPlugin.WALL_TIME_LIMIT_REACHED class-attribute instance-attribute #

WALL_TIME_LIMIT_REACHED: Event[
    [WallTimeoutException], Any
] = Event("pynisher-walltime-limit")

A Task was submitted but reached it's wall time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-walltime-limit")
def callback(exception):
    pass
╭─ Task f(x: int) -> int ──────────────────────────────────────────────────────╮
 ╭───────────────────────── Plugin pynisher-plugin ─────────────────────────╮ 
 Memory  Wall Time  CPU Time 
None    (1, 's')   None
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰──────────────────────────── Ref: Task-f-i8OWPrE1 ────────────────────────────╯

Scheduler Executor

This will place process limits on the task as soon as it starts running, whever it may be running. If you are using Scheduler.with_sequential() then this will place limits on the main process, likely not what you want. This also does not work with a ThreadPoolExecutor.

If using this with something like [dask-jobqueue], then this will place limits on the workers it spawns. It would be better to place limits directly through dask job-queue then.

Platform Limitations (Mac, Windows)

Pynisher has some limitations with memory on Mac and Windows: automl/pynisher#features

You can check this with PynisherPlugin.supports("memory"), PynisherPlugin.supports("cpu_time") and PynisherPlugin.supports("wall_time"). See PynisherPlugin.supports()

Comm#

amltk.scheduling.plugins.comm #

The Comm.Plugin enables two way-communication with running Task.

The Comm provides an easy interface to communicate while the Comm.Msg encapsulates messages between the main process and the Task.

Usage

To setup a Task to work with a Comm, the Task must accept a comm as a keyword argument. This is to prevent it conflicting with any args passed through during the call to submit().

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def powers_of_two(start: int, n: int, *, comm: Comm) -> None:
    with comm.open():
        for i in range(n):
            comm.send(start ** (i+1))

scheduler = Scheduler.with_processes(1)
task = scheduler.task(powers_of_two, plugins=Comm.Plugin())
results = []

@scheduler.on_start
def on_start():
    task.submit(2, 5)

@task.on("comm-open")
def on_open(msg: Comm.Msg):
    print(f"Task has opened | {msg}")

@task.on("comm-message")
def on_message(msg: Comm.Msg):
    results.append(msg.data)

scheduler.run()
print(results)
Task has opened | Comm.Msg(kind=<Kind.OPEN: 'open'>, data=None)
[2, 4, 8, 16, 32]

You can also block a worker, waiting for a response from the main process, allowing for the worker to request() data from the main process.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def my_worker(comm: Comm, n_tasks: int) -> None:
    with comm.open():
        for task_number in range(n_tasks):
            task = comm.request(task_number)
            comm.send(f"Task recieved {task} for {task_number}")

scheduler = Scheduler.with_processes(1)
task = scheduler.task(my_worker, plugins=Comm.Plugin())

items = ["A", "B", "C"]
results = []

@scheduler.on_start
def on_start():
    task.submit(n_tasks=3)

@task.on("comm-request")
def on_request(msg: Comm.Msg):
    task_number = msg.data
    msg.respond(items[task_number])

@task.on("comm-message")
def on_message(msg: Comm.Msg):
    results.append(msg.data)

scheduler.run()
print(results)
['Task recieved A for 0', 'Task recieved B for 1', 'Task recieved C for 2']
@events

amltk.scheduling.plugins.comm.Comm.MESSAGE class-attribute instance-attribute #

MESSAGE: Event[[Msg], Any] = Event('comm-message')

A Task has sent a message to the main process.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def fn(x: int, comm: Comm | None = None) -> int:
    assert comm is not None
    with comm.open():
        comm.send(x + 1)

scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())

@task.on("comm-message")
def callback(msg: Comm.Msg):
    print(msg.data)
╭─ Task fn(x: int, comm: amltk.scheduling.plugins.comm.Comm | None = None) -> ─╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-dRCPpRTk ────────────────────────────╯

amltk.scheduling.plugins.comm.Comm.REQUEST class-attribute instance-attribute #

REQUEST: Event[[Msg], Any] = Event('comm-request')

A Task has sent a request.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def greeter(greeting: str, comm: Comm | None = None) -> None:
    assert comm is not None
    with comm.open():
        name = comm.request()
        comm.send(f"{greeting} {name}!")

scheduler = Scheduler.with_processes(1)
task = scheduler.task(greeter, plugins=Comm.Plugin())

@scheduler.on_start
def on_start():
    task.submit("Hello")

@task.on("comm-request")
def on_request(msg: Comm.Msg):
    msg.respond("Alice")

@task.on("comm-message")
def on_msg(msg: Comm.Msg):
    print(msg.data)

scheduler.run()
Hello Alice!
╭─ Task greeter(greeting: str, comm: amltk.scheduling.plugins.comm.Comm | None─╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰───────────────────────── Ref: Task-greeter-QEUC6xdm ─────────────────────────╯

amltk.scheduling.plugins.comm.Comm.OPEN class-attribute instance-attribute #

OPEN: Event[[Msg], Any] = Event('comm-open')

The task has signalled it's open.

```python exec="true" source="material-block" html="true" hl_lines="5 15-17" from amltk.scheduling import Scheduler from amltk.scheduling.plugins import Comm

def fn(comm: Comm) -> None: with comm.open(): pass from amltk._doc import make_picklable; make_picklable(fn) # markdown-exec: hide

scheduler = Scheduler.with_processes(1) task = scheduler.task(fn, plugins=Comm.Plugin())

@scheduler.on_start def on_start(): task.submit()

@task.on("comm-open") def callback(msg: Comm.Msg): print("Comm has just used comm.open()")

scheduler.run() from amltk._doc import doc_print; doc_print(print, task) # markdown-exec: hide ```

amltk.scheduling.plugins.comm.Comm.CLOSE class-attribute instance-attribute #

CLOSE: Event[[Msg], Any] = Event('comm-close')

The task has signalled it's close.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def fn(comm: Comm) -> None:
    with comm.open():
        pass
        # Will send a close signal to the main process as it exists this block

    print("Done")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())

@scheduler.on_start
def on_start():
    task.submit()

@task.on("comm-close")
def on_close(msg: Comm.Msg):
    print(f"Worker close with {msg}")

scheduler.run()
Worker close with Comm.Msg(kind=, data=None)
╭─ Task fn(comm: amltk.scheduling.plugins.comm.Comm) -> None ──────────────────╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-rV5lGSkk ────────────────────────────╯

Supported Backends

The current implementation relies on Pipe which only works between processes on the same system/cluster. There is also limited support with dask backends.

This could be extended to allow for web sockets or other forms of connections but requires time. Please let us know in the Github issues if this is something you are interested in!

ThreadPoolCTL#

amltk.scheduling.plugins.threadpoolctl #

The ThreadPoolCTLPlugin if useful for parallel training of models. Without limiting with threadpoolctl, the number of threads used by a given model may oversubscribe to resources and cause significant slowdowns.

This is the mechanism employed by scikit-learn to limit the number of threads used by a given model.

See threadpoolctl documentation.

Requirements

This requires threadpoolctl which can be installed with:

pip install amltk[threadpoolctl]

# Or directly
pip install threadpoolctl
Usage
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins.threadpoolctl import ThreadPoolCTLPlugin

scheduler = Scheduler.with_processes(1)

def f() -> None:
    # ... some task that respects the limits set by threadpoolctl
    pass

task = scheduler.task(f, plugins=ThreadPoolCTLPlugin(max_threads=1))
╭─ Task f() -> None ───────────────────────────────────────────────────────────╮
 ╭────────────────────── Plugin threadpoolctl-plugin ───────────────────────╮ 
 Max Threads  User-API 
1            None
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰──────────────────────────── Ref: Task-f-9WVsJgNn ────────────────────────────╯

Warning Filter#

amltk.scheduling.plugins.warning_filter #

The WarningFilter if used to automatically filter out warnings from a Task as it runs.

This wraps your function in context manager warnings.catch_warnings() and applies your arguments to warnings.filterwarnings(), as you would normally filter warnings in Python.

Usage
import warnings
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import WarningFilter

def f() -> None:
    warnings.warn("This is a warning")

scheduler = Scheduler.with_processes(1)
task = scheduler.task(f, plugins=WarningFilter("ignore"))
╭─ Task f() -> None ───────────────────────────────────────────────────────────╮
 ╭───────────────────────── Plugin warning-filter ──────────────────────────╮ 
 Args         Kwargs 
('ignore',)  {}
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰──────────────────────────── Ref: Task-f-4ZryZEh6 ────────────────────────────╯

Creating Your Own Plugin#

amltk.scheduling.plugins.plugin #

A plugin that can be attached to a Task.

By inheriting from a Plugin, you can hook into a Task. A plugin can affect, modify and extend its behaviours. Please see the documentation of the methods for more information. Creating a plugin is only necesary if you need to modify actual behaviour of the task. For siply hooking into the lifecycle of a task, you can use the @events that a Task emits.

Creating a Plugin

For a full example of a simple plugin, see the Limiter plugin which prevents the task being submitted if for example, it has already been submitted too many times.

The below example shows how to create a plugin that prints the task name before submitting it. It also emits an event when the task is submitted.

from __future__ import annotations
from typing import Callable

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Plugin
from amltk.scheduling.events import Event

# A simple plugin that prints the task name before submitting
class Printer(Plugin):
    name = "my-plugin"

    # Define an event the plugin will emit
    # Event[Task] indicates the callback for the event will be called with the task
    PRINTED: Event[str] = Event("printer-msg")

    def __init__(self, greeting: str):
        self.greeting = greeting
        self.n_greetings = 0

    def attach_task(self, task) -> None:
        self.task = task
        # Register an event with the task, this lets the task know valid events
        # people can subscribe to and helps it show up in visuals
        task.add_event(self.PRINTED)
        task.on_submitted(self._print_submitted, hidden=True)  # You can hide this callback from visuals

    def pre_submit(self, fn, *args, **kwargs) -> tuple[Callable, tuple, dict]:
        print(f"{self.greeting} for {self.task} {args} {kwargs}")
        self.n_greetings += 1
        return fn, args, kwargs

    def _print_submitted(self, future, *args, **kwargs) -> None:
        msg = f"Task was submitted {self.task} {args} {kwargs}"
        self.task.emit(self.PRINTED, msg)  # Emit the event with a msg

    def __rich__(self):
        # Custome how the plugin is displayed in rich (Optional)
        # rich is an optional dependancy of amltk so we move the imports into here
        from rich.panel import Panel

        return Panel(
            f"Greeting: {self.greeting} ({self.n_greetings})",
            title=f"Plugin {self.name}"
        )

def fn(x: int) -> int:
    return x + 1

scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Printer("Hello")])

@scheduler.on_start
def on_start():
    task.submit(15)

@task.on("printer-msg")
def callback(msg: str):
    print("\nmsg")

scheduler.run()
Hello for Task(unique_ref=Task-fn-TyEI9BtB, plugins=[<_code_block_n152_.Printer object at 0x7efd593eaa40>]) (15,) {} msg
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
 ╭──────────────────────────── Plugin my-plugin ────────────────────────────╮ 
 │ Greeting: Hello (1)                                                      │ 
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-TyEI9BtB ────────────────────────────╯

All methods are optional, and you can choose to implement only the ones you need. Most plugins will likely need to implement the attach_task() method, which is called when the plugin is attached to a task. In this method, you can for example subscribe to events on the task, create new subscribers for people to use or even store a reference to the task for later use.

Plugins are also encouraged to utilize the events of a Task to further hook into the lifecycle of the task. For exampe, by saving a reference to the task in the attach_task() method, you can use the emit() method of the task to emit your own specialized events.