Comm
amltk.scheduling.plugins.comm
#
The Comm.Plugin
enables
two way-communication with running Task
.
The Comm
provides an easy interface to
communicate while the Comm.Msg
encapsulates
messages between the main process and the Task
.
Usage
To setup a Task
to work with a Comm
, the Task
must accept a comm
as
a keyword argument. This is to prevent it conflicting with any args passed
through during the call to submit()
.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def powers_of_two(start: int, n: int, *, comm: Comm) -> None:
with comm.open():
for i in range(n):
comm.send(start ** (i+1))
scheduler = Scheduler.with_processes(1)
task = scheduler.task(powers_of_two, plugins=Comm.Plugin())
results = []
@scheduler.on_start
def on_start():
task.submit(2, 5)
@task.on("comm-open")
def on_open(msg: Comm.Msg):
print(f"Task has opened | {msg}")
@task.on("comm-message")
def on_message(msg: Comm.Msg):
results.append(msg.data)
scheduler.run()
print(results)
You can also block a worker, waiting for a response from the main process, allowing for the
worker to request()
data from the main process.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def my_worker(comm: Comm, n_tasks: int) -> None:
with comm.open():
for task_number in range(n_tasks):
task = comm.request(task_number)
comm.send(f"Task recieved {task} for {task_number}")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(my_worker, plugins=Comm.Plugin())
items = ["A", "B", "C"]
results = []
@scheduler.on_start
def on_start():
task.submit(n_tasks=3)
@task.on("comm-request")
def on_request(msg: Comm.Msg):
task_number = msg.data
msg.respond(items[task_number])
@task.on("comm-message")
def on_message(msg: Comm.Msg):
results.append(msg.data)
scheduler.run()
print(results)
@events
amltk.scheduling.plugins.comm.Comm.MESSAGE
class-attribute
instance-attribute
#
A Task has sent a message to the main process.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(x: int, comm: Comm | None = None) -> int:
assert comm is not None
with comm.open():
comm.send(x + 1)
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@task.on("comm-message")
def callback(msg: Comm.Msg):
print(msg.data)
╭─ Task fn(x: int, comm: amltk.scheduling.plugins.comm.Comm | None = None) -> ─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-ZVHs2JO2 ────────────────────────────╯
amltk.scheduling.plugins.comm.Comm.REQUEST
class-attribute
instance-attribute
#
A Task has sent a request.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def greeter(greeting: str, comm: Comm | None = None) -> None:
assert comm is not None
with comm.open():
name = comm.request()
comm.send(f"{greeting} {name}!")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(greeter, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
task.submit("Hello")
@task.on("comm-request")
def on_request(msg: Comm.Msg):
msg.respond("Alice")
@task.on("comm-message")
def on_msg(msg: Comm.Msg):
print(msg.data)
scheduler.run()
╭─ Task greeter(greeting: str, comm: amltk.scheduling.plugins.comm.Comm | None─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────── Ref: Task-greeter-4pr2HmvR ─────────────────────────╯
amltk.scheduling.plugins.comm.Comm.OPEN
class-attribute
instance-attribute
#
The task has signalled it's open.
```python exec="true" source="material-block" html="true" hl_lines="5 15-17" from amltk.scheduling import Scheduler from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None: with comm.open(): pass from amltk._doc import make_picklable; make_picklable(fn) # markdown-exec: hide
scheduler = Scheduler.with_processes(1) task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start def on_start(): task.submit()
@task.on("comm-open") def callback(msg: Comm.Msg): print("Comm has just used comm.open()")
scheduler.run() from amltk._doc import doc_print; doc_print(print, task) # markdown-exec: hide ```
amltk.scheduling.plugins.comm.Comm.CLOSE
class-attribute
instance-attribute
#
The task has signalled it's close.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None:
with comm.open():
pass
# Will send a close signal to the main process as it exists this block
print("Done")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
task.submit()
@task.on("comm-close")
def on_close(msg: Comm.Msg):
print(f"Worker close with {msg}")
scheduler.run()
╭─ Task fn(comm: amltk.scheduling.plugins.comm.Comm) -> None ──────────────────╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-WpucW405 ────────────────────────────╯
Supported Backends
The current implementation relies on Pipe
which only
works between processes on the same system/cluster. There is also limited support
with dask
backends.
This could be extended to allow for web sockets or other forms of connections but requires time. Please let us know in the Github issues if this is something you are interested in!
AsyncComm
dataclass
#
AsyncComm(comm: Comm)
A async wrapper of a Comm.
request
async
#
Recieve a message.
PARAMETER | DESCRIPTION |
---|---|
timeout |
The timeout in seconds to wait for a message, raises
a
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Any
|
The message from the worker or the default value. |
Source code in src/amltk/scheduling/plugins/comm.py
Comm
#
Comm(connection: Connection)
A communication channel between a worker and scheduler.
For duplex connections, such as returned by python's builtin
Pipe
, use the
create(duplex=...)
class method.
Adds three new events to the task:
ATTRIBUTE | DESCRIPTION |
---|---|
connection |
The underlying Connection
|
id |
The id of the comm.
TYPE:
|
PARAMETER | DESCRIPTION |
---|---|
connection |
The underlying Connection
TYPE:
|
Source code in src/amltk/scheduling/plugins/comm.py
CLOSE
class-attribute
instance-attribute
#
The task has signalled it's close.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None:
with comm.open():
pass
# Will send a close signal to the main process as it exists this block
print("Done")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
task.submit()
@task.on("comm-close")
def on_close(msg: Comm.Msg):
print(f"Worker close with {msg}")
scheduler.run()
╭─ Task fn(comm: amltk.scheduling.plugins.comm.Comm) -> None ──────────────────╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-lFoKfnT7 ────────────────────────────╯
MESSAGE
class-attribute
instance-attribute
#
A Task has sent a message to the main process.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def fn(x: int, comm: Comm | None = None) -> int:
assert comm is not None
with comm.open():
comm.send(x + 1)
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())
@task.on("comm-message")
def callback(msg: Comm.Msg):
print(msg.data)
╭─ Task fn(x: int, comm: amltk.scheduling.plugins.comm.Comm | None = None) -> ─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰─────────────────────────── Ref: Task-fn-ULAboBss ────────────────────────────╯
OPEN
class-attribute
instance-attribute
#
The task has signalled it's open.
```python exec="true" source="material-block" html="true" hl_lines="5 15-17" from amltk.scheduling import Scheduler from amltk.scheduling.plugins import Comm
def fn(comm: Comm) -> None: with comm.open(): pass from amltk._doc import make_picklable; make_picklable(fn) # markdown-exec: hide
scheduler = Scheduler.with_processes(1) task = scheduler.task(fn, plugins=Comm.Plugin())
@scheduler.on_start def on_start(): task.submit()
@task.on("comm-open") def callback(msg: Comm.Msg): print("Comm has just used comm.open()")
scheduler.run() from amltk._doc import doc_print; doc_print(print, task) # markdown-exec: hide ```
REQUEST
class-attribute
instance-attribute
#
A Task has sent a request.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm
def greeter(greeting: str, comm: Comm | None = None) -> None:
assert comm is not None
with comm.open():
name = comm.request()
comm.send(f"{greeting} {name}!")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(greeter, plugins=Comm.Plugin())
@scheduler.on_start
def on_start():
task.submit("Hello")
@task.on("comm-request")
def on_request(msg: Comm.Msg):
msg.respond("Alice")
@task.on("comm-message")
def on_msg(msg: Comm.Msg):
print(msg.data)
scheduler.run()
╭─ Task greeter(greeting: str, comm: amltk.scheduling.plugins.comm.Comm | None─╮
│ ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ │
│ │ Open Connections: 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────── Ref: Task-greeter-qMdkvLoM ─────────────────────────╯
CloseRequestError
#
Bases: RuntimeError
An exception happened in the main process and it send a response to the worker to raise this exception.
Msg
dataclass
#
Bases: Generic[T]
A message sent over a communication channel.
ATTRIBUTE | DESCRIPTION |
---|---|
task |
The task that sent the message.
TYPE:
|
comm |
The communication channel.
TYPE:
|
future |
The future of the task.
TYPE:
|
data |
The data sent by the task.
TYPE:
|
Plugin
#
Plugin(
parameter_name: str = "comm",
create_comms: (
Callable[[], tuple[Comm, Comm]] | None
) = None,
)
Bases: Plugin
A plugin that handles communication with a worker.
PARAMETER | DESCRIPTION |
---|---|
parameter_name |
The name of the parameter to inject the comm into.
TYPE:
|
create_comms |
A function that creates a pair of communication
channels. Defaults to |
Source code in src/amltk/scheduling/plugins/comm.py
attach_task
#
attach_task(task: Task) -> None
Attach the plugin to a task.
This method is called when the plugin is attached to a task. This is the place to subscribe to events on the task, create new subscribers for people to use or even store a reference to the task for later use.
PARAMETER | DESCRIPTION |
---|---|
task |
The task the plugin is being attached to.
TYPE:
|
Source code in src/amltk/scheduling/plugins/comm.py
events
#
Return a list of events that this plugin emits.
Likely no need to override this method, as it will automatically return all events defined on the plugin.
Source code in src/amltk/scheduling/plugins/plugin.py
pre_submit
#
pre_submit(
fn: Callable[P, R], *args: args, **kwargs: kwargs
) -> tuple[Callable[P, R], tuple, dict] | None
Pre-submit hook.
This method is called before the task is submitted.
PARAMETER | DESCRIPTION |
---|---|
fn |
The task function.
TYPE:
|
*args |
The arguments to the task function.
TYPE:
|
**kwargs |
The keyword arguments to the task function.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
tuple[Callable[P, R], tuple, dict] | None
|
A tuple of the task function, arguments and keyword arguments
if the task should be submitted, or |
Source code in src/amltk/scheduling/plugins/comm.py
TimeoutError
#
Bases: TimeoutError
A timeout error for communications.
close
#
close(
msg: Any | None = None,
*,
wait_for_ack: bool = False,
okay_if_broken_pipe: bool = False,
side: str = ""
) -> None
Close the connection.
PARAMETER | DESCRIPTION |
---|---|
msg |
The message to send to the other end of the connection.
TYPE:
|
wait_for_ack |
If
TYPE:
|
okay_if_broken_pipe |
If
TYPE:
|
side |
The side of the connection for naming purposes.
TYPE:
|
Source code in src/amltk/scheduling/plugins/comm.py
create
classmethod
#
Create a pair of communication channels.
Wraps the output of
multiprocessing.Pipe(duplex=duplex)
.
PARAMETER | DESCRIPTION |
---|---|
duplex |
Whether to allow for two-way communication
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
tuple[Self, Self]
|
A pair of communication channels. |
Source code in src/amltk/scheduling/plugins/comm.py
open
#
open(
opening_msg: Any | None = None,
*,
wait_for_ack: bool = False,
side: str = "worker"
) -> Iterator[Self]
Open the connection.
PARAMETER | DESCRIPTION |
---|---|
opening_msg |
The message to send to the main process when the connection is opened.
TYPE:
|
wait_for_ack |
If
TYPE:
|
side |
The side of the connection for naming purposes.
Usually this is only done on the
TYPE:
|
YIELDS | DESCRIPTION |
---|---|
Self
|
The comm. |
Source code in src/amltk/scheduling/plugins/comm.py
request
#
Receive a message.
PARAMETER | DESCRIPTION |
---|---|
msg |
The message to send to the other end of the connection.
If left empty, will be
TYPE:
|
timeout |
If float, will wait for that many seconds, raising an exception if exceeded. Otherwise, None will wait forever.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
TimeoutError
|
If the timeout is reached. |
CloseRequestError
|
If the other end needs to abruptly end and can not fufill the request. If thise error is thrown, the worker should finish as soon as possible. |
RETURNS | DESCRIPTION |
---|---|
Any
|
The received message or the default. |