Events
One of the primary ways to respond to @events
emitted
with by a Task
the Scheduler
is through use of a callback.
The reason for this is to enable an easier time for API's to utilize
multiprocessing and remote compute from the Scheduler
, without having
to burden users with knowing the details of how to use multiprocessing.
A callback subscribes to some event using a decorator but can also be done in
a functional style if preferred. The below example is based on the
event @scheduler.on_start
but
the same applies to all events.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
@scheduler.on_start
def print_hello() -> None:
print("hello")
scheduler.run()
hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def print_hello() -> 'None' (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def print_hello() -> None:
print("hello")
scheduler.on_start(print_hello)
scheduler.run()
hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def print_hello() -> 'None' (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
There are a number of ways to customize the behaviour of these callbacks, notably to control how often they get called and when they get called.
Callback customization
This will cause the callback to be called repeat
times successively.
This is most useful in combination with
@scheduler.on_start
to launch
a number of tasks at the start of the scheduler.
from amltk import Scheduler
N_WORKERS = 2
def f(x: int) -> int:
return x * 2
scheduler = Scheduler.with_processes(N_WORKERS)
task = scheduler.task(f)
@scheduler.on_start(repeat=N_WORKERS)
def on_start():
task.submit(1)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def on_start() (2)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 2
┃ @on_future_done 2
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 2
┗━━ ╭─ Task f(x: 'int') -> 'int' ──────────────────────────────────────────────╮
│ @on_submitted 2 │
│ @on_done 2 │
│ @on_result 2 │
│ @on_exception │
│ @on_cancelled │
╰───────────────────────────── Ref: D1PfzVQ9 ──────────────────────────────╯
Limit the number of times a callback can be called, after which, the callback will be ignored.
from asyncio import Future
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(2)
def expensive_function(x: int) -> int:
return x ** 2
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function, 2)
@scheduler.on_future_result(limit=3)
def print_result(future, result) -> None:
scheduler.submit(expensive_function, 2)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def submit_calculations() -> 'None' (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 4
@on_future_done 4
@on_future_cancelled
@on_future_exception
@on_future_result 4
└── def print_result(future, result) -> 'None' (3)
A callable which takes no arguments and returns a bool
. The callback
will only be called when the when
callable returns True
.
Below is a rather contrived example, but it shows how we can use the
when
parameter to control when the callback is called.
import random
from amltk.scheduling import Scheduler
LOCALE = random.choice(["English", "German"])
scheduler = Scheduler.with_processes(1)
@scheduler.on_start(when=lambda: LOCALE == "English")
def print_hello() -> None:
print("hello")
@scheduler.on_start(when=lambda: LOCALE == "German")
def print_guten_tag() -> None:
print("guten tag")
scheduler.run()
hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
├── def print_hello() -> 'None' (1)
└── def print_guten_tag() -> 'None'
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Only call the callback every every
times the event is emitted. This
includes the first time it's called.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
# Print "hello" only every 2 times the scheduler starts.
@scheduler.on_start(every=2)
def print_hello() -> None:
print("hello")
# Run the scheduler 5 times
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
hello hello
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 5
@on_start 5
└── def print_hello() -> 'None' (2)
@on_finishing 5
@on_finished 5
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Emitter, Subscribers and Events#
This part of the documentation is not necessary to understand or use for AMLTK. People wishing to build tools upon AMLTK may still find this a useful component to add to their arsenal.
The core of making this functionality work is the Emitter
.
Its purpose is to have @events
that can be emitted and subscribed to. Classes like the
Scheduler
and Task
carry
around with them an Emitter
to enable all of this functionality.
Creating an Emitter
is rather straight-forward, but we must also create
Events
that people can subscribe to.
from amltk.scheduling import Emitter, Event
emitter = Emitter("my-emitter")
event: Event[int] = Event("my-event") # (1)!
@emitter.on(event)
def my_callback(x: int) -> None:
print(f"Got {x}!")
emitter.emit(event, 42) # (2)!
- The typing
Event[int]
is used to indicate that the event will be emitting an integer. This is not necessary, but it is useful for type-checking and documentation. - The
emitter.emit(event, 42)
is used to emit the event. This will call all the callbacks registered for the event, i.e.my_callback()
.
Independent Events
Given a single Emitter
and a single instance of an Event
, there is no way to
have different @events
for callbacks. There are two options, both used extensively
in AMLTK.
The first is to have different Events
quite naturally, i.e. you distinguish
between different things that can happen. However, you often want to have different
objects emit the same Event
but have different callbacks for each object.
This makes most sense in the context of a Task
the Event
instances are shared as
class variables in the Task
class, however a user likely want's to subscribe to
the Event
for a specific instance of the Task
.
This is where the second option comes in, in which each object carries around its
own Emitter
instance. This is how a user can subscribe to the same kind of Event
but individually for each Task
.
However, to shield users from this and to create named access points for users to
subscribe to, we can use the Subscriber
class,
conveniently created by the Emitter.subscriber()
method.
from amltk.scheduling import Emitter, Event
emitter = Emitter("my-emitter")
class GPT:
event: Event[str] = Event("my-event")
def __init__(self) -> None:
self.on_answer: Subscriber[str] = emitter.subscriber(self.event)
def ask(self, question: str) -> None:
emitter.emit(self.event, "hello world!")
gpt = GPT()
@gpt.on_answer
def print_answer(answer: str) -> None:
print(answer)
gpt.ask("What is the conical way for an AI to greet someone?")
Typically these event based systems make little sense in a synchronous context, however
with the Scheduler
and Task
classes, they are used to enable a simple way to use multiprocessing and remote compute.
class RegisteredTimeCallOrderStrategy
dataclass
#
A calling strategy that calls callbacks in the order they were registered.
def execute(events)
classmethod
#
Call all events in the scheduler.
Source code in src/amltk/scheduling/events.py
class Event
dataclass
#
class Subscriber
dataclass
#
Bases: Generic[P]
An object that can be used to easily subscribe to a certain event.
from amltk.scheduling.events import Event, Subscriber
test_event: Event[[int, str]] = Event("test")
emitter = Emitter("hello world")
subscribe = emitter.subscriber(test_event)
# Use it as a decorator
@subscribe
def callback(a: int, b: str) -> None:
print(f"Got {a} and {b}!")
# ... or just pass a function
subscribe(callback)
# Will emit `test_event` with the arguments 1 and "hello"
# and call the callback with those same arguments.
emitter.emit(test_event, 1, "hello")
ATTRIBUTE | DESCRIPTION |
---|---|
manager |
The event manager to use.
|
event |
The event to subscribe to.
TYPE:
|
event_counts: int
prop
#
The number of times this event has been emitted.
def __call__(callback=None, *, when=None, limit=None, repeat=1, every=1, hidden=False)
#
Subscribe to the event associated with this object.
PARAMETER | DESCRIPTION |
---|---|
callback |
The callback to register. |
when |
A predicate that must be satisfied for the callback to be called. |
every |
The callback will be called every
TYPE:
|
repeat |
The callback will be called
TYPE:
|
limit |
The maximum number of times the callback can be called.
TYPE:
|
hidden |
Whether to hide the callback in visual output. This is mainly used to facilitate Plugins who act upon events but don't want to be seen, primarily as they are just book-keeping callbacks.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Callable[P, Any] | partial[Callable[P, Any]]
|
The callback if it was provided, otherwise it acts as a decorator. |
Source code in src/amltk/scheduling/events.py
class Handler
dataclass
#
Bases: Generic[P]
A handler for an event.
This is a simple class that holds a callback and any predicate that must be satisfied for it to be triggered.
def __call__(*args, **kwargs)
#
Call the callback if the predicate is satisfied.
If the predicate is not satisfied, then None
is returned.
Source code in src/amltk/scheduling/events.py
class Emitter(name=None)
#
Bases: Mapping[Event, list[Handler]]
An event emitter.
This class is used to emit events and register callbacks for those events.
It also provides a convenience function
subscriber()
such
that objects using an Emitter
can easily create access points for users
to directly subscribe to their Events
.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the emitter. If not provided, then a UUID will be used.
TYPE:
|
Source code in src/amltk/scheduling/events.py
name: str | None
attr
#
The name of the emitter.
handlers: dict[Event, list[Handler]]
attr
#
A mapping of events to their handlers.
event_counts: Counter[Event]
attr
#
A count of all events emitted by this emitter.
events: list[Event]
prop
#
Return a list of the events.
def emit(event, *args, **kwargs)
#
Emit an event.
This will call all the handlers for the event.
PARAMETER | DESCRIPTION |
---|---|
event |
The event to emit. If passing a list, then the handlers for all events will be called, regardless of the order
TYPE:
|
*args |
The positional arguments to pass to the handlers.
TYPE:
|
**kwargs |
The keyword arguments to pass to the handlers.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
None
|
A list of the results from the handlers. |
Source code in src/amltk/scheduling/events.py
def emit_many(events)
#
Emit multiple events.
This is useful for cases where you don't want to favour one callback over another, and so uses the time a callback was registered to call the callback instead.
PARAMETER | DESCRIPTION |
---|---|
events |
A mapping of event keys to a tuple of positional arguments and keyword arguments to pass to the handlers.
TYPE:
|
Source code in src/amltk/scheduling/events.py
def subscriber(event, *, when=None, every=1, repeat=1, limit=None)
#
Create a subscriber for an event.
PARAMETER | DESCRIPTION |
---|---|
event |
The event to register the callback for.
TYPE:
|
when |
A predicate that must be satisfied for the callback to be called. |
every |
The callback will be called every
TYPE:
|
repeat |
The callback will be called
TYPE:
|
limit |
The maximum number of times the callback can be called.
TYPE:
|
Source code in src/amltk/scheduling/events.py
def on(event, callback, *, when=None, every=1, repeat=1, limit=None, hidden=False)
#
Register a callback for an event.
PARAMETER | DESCRIPTION |
---|---|
event |
The event to register the callback for.
TYPE:
|
callback |
The callback to register.
TYPE:
|
when |
A predicate that must be satisfied for the callback to be called. |
every |
The callback will be called every
TYPE:
|
repeat |
The callback will be called
TYPE:
|
limit |
The maximum number of times the callback can be called.
TYPE:
|
hidden |
Whether to hide the callback in visual output. This is mainly used to facilitate Plugins who act upon events but don't want to be seen, primarily as they are just book-keeping callbacks.
TYPE:
|