Events
Events#
One of the primary ways to respond to @events
emitted
with by a Task
the Scheduler
is through use of a callback.
The reason for this is to enable an easier time for API's to utilize
multiprocessing and remote compute from the Scheduler
, without having
to burden users with knowing the details of how to use multiprocessing.
A callback subscribes to some event using a decorator but can also be done in
a functional style if preferred. The below example is based on the
event @scheduler.on_start
but
the same applies to all events.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
@scheduler.on_start
def print_hello() -> None:
print("hello")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def print_hello() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
def print_hello() -> None:
print("hello")
scheduler.on_start(print_hello)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def print_hello() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
There are a number of ways to customize the behaviour of these callbacks, notably to control how often they get called and when they get called.
Callback customization
This will cause the callback to be called repeat
times successively.
This is most useful in combination with
@scheduler.on_start
to launch
a number of tasks at the start of the scheduler.
from amltk import Scheduler
N_WORKERS = 2
def f(x: int) -> int:
return x * 2
scheduler = Scheduler.with_processes(N_WORKERS)
task = scheduler.task(f)
@scheduler.on_start(repeat=N_WORKERS)
def on_start():
task.submit(1)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
┃ @on_start 1
┃ └── def on_start() (1)
┃ @on_finishing 1
┃ @on_finished 1
┃ @on_stop
┃ @on_timeout
┃ @on_future_submitted 2
┃ @on_future_done 2
┃ @on_future_cancelled
┃ @on_future_exception
┃ @on_future_result 2
┗━━ ╭─ Task f(x: int) -> int ──────────────────────────────────────────────────╮
╰────────────────────────── Ref: Task-f-Bzy5HHkJ ──────────────────────────╯
Limit the number of times a callback can be called, after which, the callback will be ignored.
from asyncio import Future
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(2)
def expensive_function(x: int) -> int:
return x ** 2
@scheduler.on_start
def submit_calculations() -> None:
scheduler.submit(expensive_function, 2)
@scheduler.on_future_result(max_calls=3)
def print_result(future, result) -> None:
scheduler.submit(expensive_function, 2)
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 2} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
└── def submit_calculations() -> None (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 4
@on_future_done 4
@on_future_cancelled
@on_future_exception
@on_future_result 4
└── def print_result(future, result) -> None (3)
A callable which takes no arguments and returns a bool
. The callback
will only be called when the when
callable returns True
.
Below is a rather contrived example, but it shows how we can use the
when
parameter to control when the callback is called.
import random
from amltk.scheduling import Scheduler
LOCALE = random.choice(["English", "German"])
scheduler = Scheduler.with_processes(1)
@scheduler.on_start(when=lambda: LOCALE == "English")
def print_hello() -> None:
print("hello")
@scheduler.on_start(when=lambda: LOCALE == "German")
def print_guten_tag() -> None:
print("guten tag")
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 1
@on_start 1
├── def print_hello() -> None (1)
└── def print_guten_tag() -> None
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Only call the callback every every
times the event is emitted. This
includes the first time it's called.
from amltk.scheduling import Scheduler
scheduler = Scheduler.with_processes(1)
# Print "hello" only every 2 times the scheduler starts.
@scheduler.on_start(every=2)
def print_hello() -> None:
print("hello")
# Run the scheduler 5 times
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
scheduler.run()
╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
│ Executor Queue: (0) │
│ ╭─ ProcessPoolExecutor─╮ │
│ │ {'max_workers': 1} │ │
│ ╰──────────────────────╯ │
│ │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯
┗━━ @on_empty 5
@on_start 5
└── def print_hello() -> None (2)
@on_finishing 5
@on_finished 5
@on_stop
@on_timeout
@on_future_submitted
@on_future_done
@on_future_cancelled
@on_future_exception
@on_future_result
Emitter, Subscribers and Events#
This part of the documentation is not necessary to understand or use for AMLTK. People wishing to build tools upon AMLTK may still find this a useful component to add to their arsenal.
The core of making this functionality work is the Emitter
.
Its purpose is to have @events
that can be emitted and subscribed to. Classes like the
Scheduler
and Task
carry
around with them an Emitter
to enable all of this functionality.
Creating an Emitter
is rather straight-forward, but we must also create
Events
that people can subscribe to.
from amltk.scheduling import Emitter, Event
emitter = Emitter("my-emitter")
event: Event[int] = Event("my-event") # (1)!
@emitter.on(event)
def my_callback(x: int) -> None:
print(f"Got {x}!")
emitter.emit(event, 42) # (2)!
- The typing
Event[int]
is used to indicate that the event will be emitting an integer. This is not necessary, but it is useful for type-checking and documentation. - The
emitter.emit(event, 42)
is used to emit the event. This will call all the callbacks registered for the event, i.e.my_callback()
.
Independent Events
Given a single Emitter
and a single instance of an Event
, there is no way to
have different @events
for callbacks. There are two options, both used extensively
in AMLTK.
The first is to have different Events
quite naturally, i.e. you distinguish
between different things that can happen. However, you often want to have different
objects emit the same Event
but have different callbacks for each object.
This makes most sense in the context of a Task
the Event
instances are shared as
class variables in the Task
class, however a user likely want's to subscribe to
the Event
for a specific instance of the Task
.
This is where the second option comes in, in which each object carries around its
own Emitter
instance. This is how a user can subscribe to the same kind of Event
but individually for each Task
.
However, to shield users from this and to create named access points for users to
subscribe to, we can use the Subscriber
class,
conveniently created by the Emitter.subscriber()
method.
from amltk.scheduling import Emitter, Event
emitter = Emitter("my-emitter")
class GPT:
event: Event[str] = Event("my-event")
def __init__(self) -> None:
self.on_answer: Subscriber[str] = emitter.subscriber(self.event)
def ask(self, question: str) -> None:
emitter.emit(self.event, "hello world!")
gpt = GPT()
@gpt.on_answer
def print_answer(answer: str) -> None:
print(answer)
gpt.ask("What is the conical way for an AI to greet someone?")
Typically these event based systems make little sense in a synchronous context, however
with the Scheduler
and Task
classes, they are used to enable a simple way to use multiprocessing and remote compute.