Skip to content

Comm

amltk.scheduling.plugins.comm #

The Comm.Plugin enables two way-communication with running Task.

The Comm provides an easy interface to communicate while the Comm.Msg encapsulates messages between the main process and the Task.

Usage

To setup a Task to work with a Comm, the Task must accept a comm as a keyword argument. This is to prevent it conflicting with any args passed through during the call to submit().

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def powers_of_two(start: int, n: int, *, comm: Comm) -> None:
    with comm.open():
        for i in range(n):
            comm.send(start ** (i+1))

scheduler = Scheduler.with_processes(1)
task = scheduler.task(powers_of_two, plugins=Comm.Plugin())
results = []

@scheduler.on_start
def on_start():
    task.submit(2, 5)

@task.on("comm-open")
def on_open(msg: Comm.Msg):
    print(f"Task has opened | {msg}")

@task.on("comm-message")
def on_message(msg: Comm.Msg):
    results.append(msg.data)

scheduler.run()
print(results)
Task has opened | Comm.Msg(kind=<Kind.OPEN: 'open'>, data=None)
[2, 4, 8, 16, 32]

You can also block a worker, waiting for a response from the main process, allowing for the worker to request() data from the main process.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def my_worker(comm: Comm, n_tasks: int) -> None:
    with comm.open():
        for task_number in range(n_tasks):
            task = comm.request(task_number)
            comm.send(f"Task recieved {task} for {task_number}")

scheduler = Scheduler.with_processes(1)
task = scheduler.task(my_worker, plugins=Comm.Plugin())

items = ["A", "B", "C"]
results = []

@scheduler.on_start
def on_start():
    task.submit(n_tasks=3)

@task.on("comm-request")
def on_request(msg: Comm.Msg):
    task_number = msg.data
    msg.respond(items[task_number])

@task.on("comm-message")
def on_message(msg: Comm.Msg):
    results.append(msg.data)

scheduler.run()
print(results)
['Task recieved A for 0', 'Task recieved B for 1', 'Task recieved C for 2']

@events

amltk.scheduling.plugins.comm.Comm.MESSAGE class-attribute instance-attribute #

MESSAGE: Event[[Msg], Any] = Event('comm-message')

A Task has sent a message to the main process.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def fn(x: int, comm: Comm | None = None) -> int:
    assert comm is not None
    with comm.open():
        comm.send(x + 1)

scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())

@task.on("comm-message")
def callback(msg: Comm.Msg):
    print(msg.data)

╭─ Task fn(x: 'int', comm: 'Comm | None' = None) -> 'int' ─────────────────────╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-l05QyIpP ────────────────────────────╯

amltk.scheduling.plugins.comm.Comm.REQUEST class-attribute instance-attribute #

REQUEST: Event[[Msg], Any] = Event('comm-request')

A Task has sent a request.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def greeter(greeting: str, comm: Comm | None = None) -> None:
    assert comm is not None
    with comm.open():
        name = comm.request()
        comm.send(f"{greeting} {name}!")

scheduler = Scheduler.with_processes(1)
task = scheduler.task(greeter, plugins=Comm.Plugin())

@scheduler.on_start
def on_start():
    task.submit("Hello")

@task.on("comm-request")
def on_request(msg: Comm.Msg):
    msg.respond("Alice")

@task.on("comm-message")
def on_msg(msg: Comm.Msg):
    print(msg.data)

scheduler.run()

Hello Alice!

╭─ Task greeter(greeting: 'str', comm: 'Comm | None' = None) -> 'None' ────────╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰───────────────────────── Ref: Task-greeter-O1wfksPc ─────────────────────────╯

amltk.scheduling.plugins.comm.Comm.OPEN class-attribute instance-attribute #

OPEN: Event[[Msg], Any] = Event('comm-open')

The task has signalled it's open.

```python exec="true" source="material-block" html="true" hl_lines="5 15-17" from amltk.scheduling import Scheduler from amltk.scheduling.plugins import Comm

def fn(comm: Comm) -> None: with comm.open(): pass from amltk._doc import make_picklable; make_picklable(fn) # markdown-exec: hide

scheduler = Scheduler.with_processes(1) task = scheduler.task(fn, plugins=Comm.Plugin())

@scheduler.on_start def on_start(): task.submit()

@task.on("comm-open") def callback(msg: Comm.Msg): print("Comm has just used comm.open()")

scheduler.run() from amltk._doc import doc_print; doc_print(print, task) # markdown-exec: hide ```

amltk.scheduling.plugins.comm.Comm.CLOSE class-attribute instance-attribute #

CLOSE: Event[[Msg], Any] = Event('comm-close')

The task has signalled it's close.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def fn(comm: Comm) -> None:
    with comm.open():
        pass
        # Will send a close signal to the main process as it exists this block

    print("Done")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())

@scheduler.on_start
def on_start():
    task.submit()

@task.on("comm-close")
def on_close(msg: Comm.msg):
    print(f"Worker close with {msg}")

scheduler.run()

Worker close with Comm.Msg(kind=, data=None)

╭─ Task fn(comm: 'Comm') -> 'None' ────────────────────────────────────────────╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-ko099pWF ────────────────────────────╯

Supported Backends

The current implementation relies on Pipe which only works between processes on the same system/cluster. There is also limited support with dask backends.

This could be extended to allow for web sockets or other forms of connections but requires time. Please let us know in the Github issues if this is something you are interested in!

AsyncComm dataclass #

AsyncComm(comm: Comm)

A async wrapper of a Comm.

request async #

request(*, timeout: float | None = None) -> Any

Recieve a message.

PARAMETER DESCRIPTION
timeout

The timeout in seconds to wait for a message, raises a Comm.TimeoutError if the timeout is reached. If None, will wait forever.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Any

The message from the worker or the default value.

Source code in src/amltk/scheduling/plugins/comm.py
async def request(
    self,
    *,
    timeout: float | None = None,
) -> Any:
    """Recieve a message.

    Args:
        timeout: The timeout in seconds to wait for a message, raises
            a [`Comm.TimeoutError`][amltk.scheduling.plugins.comm.Comm.TimeoutError]
            if the timeout is reached.
            If `None`, will wait forever.

    Returns:
        The message from the worker or the default value.
    """
    connection = AsyncConnection(self.comm.connection)
    try:
        return await asyncio.wait_for(connection.recv(), timeout=timeout)
    except asyncio.TimeoutError as e:
        raise Comm.TimeoutError(
            f"Timed out waiting for response from {self.comm}",
        ) from e

send async #

send(obj: Any) -> None

Send a message.

PARAMETER DESCRIPTION
obj

The message to send.

TYPE: Any

Source code in src/amltk/scheduling/plugins/comm.py
async def send(self, obj: Any) -> None:
    """Send a message.

    Args:
        obj: The message to send.
    """
    return await AsyncConnection(self.comm.connection).send(obj)

Comm #

Comm(connection: Connection)

A communication channel between a worker and scheduler.

For duplex connections, such as returned by python's builtin Pipe, use the create(duplex=...) class method.

Adds three new events to the task:

ATTRIBUTE DESCRIPTION
connection

The underlying Connection

id

The id of the comm.

TYPE: CommID

PARAMETER DESCRIPTION
connection

The underlying Connection

TYPE: Connection

Source code in src/amltk/scheduling/plugins/comm.py
def __init__(self, connection: Connection) -> None:
    """Initialize the Comm.

    Args:
        connection: The underlying Connection
    """
    super().__init__()
    self.connection = connection
    self.id: CommID = id(self)

CLOSE class-attribute instance-attribute #

CLOSE: Event[[Msg], Any] = Event('comm-close')

The task has signalled it's close.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def fn(comm: Comm) -> None:
    with comm.open():
        pass
        # Will send a close signal to the main process as it exists this block

    print("Done")
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())

@scheduler.on_start
def on_start():
    task.submit()

@task.on("comm-close")
def on_close(msg: Comm.msg):
    print(f"Worker close with {msg}")

scheduler.run()

Worker close with Comm.Msg(kind=, data=None)

╭─ Task fn(comm: 'Comm') -> 'None' ────────────────────────────────────────────╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-kQvglOqK ────────────────────────────╯

MESSAGE class-attribute instance-attribute #

MESSAGE: Event[[Msg], Any] = Event('comm-message')

A Task has sent a message to the main process.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def fn(x: int, comm: Comm | None = None) -> int:
    assert comm is not None
    with comm.open():
        comm.send(x + 1)

scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=Comm.Plugin())

@task.on("comm-message")
def callback(msg: Comm.Msg):
    print(msg.data)

╭─ Task fn(x: 'int', comm: 'Comm | None' = None) -> 'int' ─────────────────────╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰─────────────────────────── Ref: Task-fn-dLthmkPM ────────────────────────────╯

OPEN class-attribute instance-attribute #

OPEN: Event[[Msg], Any] = Event('comm-open')

The task has signalled it's open.

```python exec="true" source="material-block" html="true" hl_lines="5 15-17" from amltk.scheduling import Scheduler from amltk.scheduling.plugins import Comm

def fn(comm: Comm) -> None: with comm.open(): pass from amltk._doc import make_picklable; make_picklable(fn) # markdown-exec: hide

scheduler = Scheduler.with_processes(1) task = scheduler.task(fn, plugins=Comm.Plugin())

@scheduler.on_start def on_start(): task.submit()

@task.on("comm-open") def callback(msg: Comm.Msg): print("Comm has just used comm.open()")

scheduler.run() from amltk._doc import doc_print; doc_print(print, task) # markdown-exec: hide ```

REQUEST class-attribute instance-attribute #

REQUEST: Event[[Msg], Any] = Event('comm-request')

A Task has sent a request.

from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Comm

def greeter(greeting: str, comm: Comm | None = None) -> None:
    assert comm is not None
    with comm.open():
        name = comm.request()
        comm.send(f"{greeting} {name}!")

scheduler = Scheduler.with_processes(1)
task = scheduler.task(greeter, plugins=Comm.Plugin())

@scheduler.on_start
def on_start():
    task.submit("Hello")

@task.on("comm-request")
def on_request(msg: Comm.Msg):
    msg.respond("Alice")

@task.on("comm-message")
def on_msg(msg: Comm.Msg):
    print(msg.data)

scheduler.run()

Hello Alice!

╭─ Task greeter(greeting: 'str', comm: 'Comm | None' = None) -> 'None' ────────╮
 ╭─────────────────────────── Plugin comm-plugin ───────────────────────────╮ 
 │ Open Connections: 0
 ╰──────────────────────────────────────────────────────────────────────────╯ 
╰───────────────────────── Ref: Task-greeter-FKTqpASB ─────────────────────────╯

as_async property #

as_async: AsyncComm

Return an async version of this comm.

CloseRequestError #

Bases: RuntimeError

An exception happened in the main process and it send a response to the worker to raise this exception.

Msg dataclass #

Msg(
    kind: Kind,
    data: T,
    comm: Comm,
    future: Future,
    task: Task,
)

Bases: Generic[T]

A message sent over a communication channel.

ATTRIBUTE DESCRIPTION
task

The task that sent the message.

TYPE: Task

comm

The communication channel.

TYPE: Comm

future

The future of the task.

TYPE: Future

data

The data sent by the task.

TYPE: T

Kind #

Bases: str, Enum

The kind of message.

respond #
respond(response: Any) -> None

Respond to the message.

PARAMETER DESCRIPTION
response

The response to send back to the task.

TYPE: Any

Source code in src/amltk/scheduling/plugins/comm.py
def respond(self, response: Any) -> None:
    """Respond to the message.

    Args:
        response: The response to send back to the task.
    """
    self.comm._send_pipe(response)

Plugin #

Plugin(
    parameter_name: str = "comm",
    create_comms: (
        Callable[[], tuple[Comm, Comm]] | None
    ) = None,
)

Bases: Plugin

A plugin that handles communication with a worker.

PARAMETER DESCRIPTION
parameter_name

The name of the parameter to inject the comm into.

TYPE: str DEFAULT: 'comm'

create_comms

A function that creates a pair of communication channels. Defaults to Comm.create.

TYPE: Callable[[], tuple[Comm, Comm]] | None DEFAULT: None

Source code in src/amltk/scheduling/plugins/comm.py
def __init__(
    self,
    parameter_name: str = "comm",
    create_comms: Callable[[], tuple[Comm, Comm]] | None = None,
) -> None:
    """Initialize the plugin.

    Args:
        parameter_name: The name of the parameter to inject the comm into.
        create_comms: A function that creates a pair of communication
            channels. Defaults to `Comm.create`.
    """
    super().__init__()
    if create_comms is None:
        create_comms = Comm.create

    self.parameter_name = parameter_name
    self.create_comms = create_comms
    self.comms: dict[CommID, tuple[Comm, Comm]] = {}
    self.communication_tasks: list[asyncio.Task] = []
    self.task: Task
    self.open_comms: set[CommID] = set()
attach_task #
attach_task(task: Task) -> None

Attach the plugin to a task.

This method is called when the plugin is attached to a task. This is the place to subscribe to events on the task, create new subscribers for people to use or even store a reference to the task for later use.

PARAMETER DESCRIPTION
task

The task the plugin is being attached to.

TYPE: Task

Source code in src/amltk/scheduling/plugins/comm.py
@override
def attach_task(self, task: Task) -> None:
    """Attach the plugin to a task.

    This method is called when the plugin is attached to a task. This
    is the place to subscribe to events on the task, create new subscribers
    for people to use or even store a reference to the task for later use.

    Args:
        task: The task the plugin is being attached to.
    """
    self.task = task
    task.add_event(Comm.MESSAGE, Comm.REQUEST, Comm.OPEN, Comm.CLOSE)
    task.on_submitted(self._begin_listening, hidden=True)
events #
events() -> list[Event]

Return a list of events that this plugin emits.

Likely no need to override this method, as it will automatically return all events defined on the plugin.

Source code in src/amltk/scheduling/plugins/plugin.py
def events(self) -> list[Event]:
    """Return a list of events that this plugin emits.

    Likely no need to override this method, as it will automatically
    return all events defined on the plugin.
    """
    inherited_attrs = chain.from_iterable(
        vars(cls).values() for cls in self.__class__.__mro__
    )
    return [attr for attr in inherited_attrs if isinstance(attr, Event)]
pre_submit #
pre_submit(
    fn: Callable[P, R], *args: args, **kwargs: kwargs
) -> tuple[Callable[P, R], tuple, dict] | None

Pre-submit hook.

This method is called before the task is submitted.

PARAMETER DESCRIPTION
fn

The task function.

TYPE: Callable[P, R]

*args

The arguments to the task function.

TYPE: args DEFAULT: ()

**kwargs

The keyword arguments to the task function.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
tuple[Callable[P, R], tuple, dict] | None

A tuple of the task function, arguments and keyword arguments if the task should be submitted, or None if the task should not be submitted.

Source code in src/amltk/scheduling/plugins/comm.py
@override
def pre_submit(
    self,
    fn: Callable[P, R],
    *args: P.args,
    **kwargs: P.kwargs,
) -> tuple[Callable[P, R], tuple, dict] | None:
    """Pre-submit hook.

    This method is called before the task is submitted.

    Args:
        fn: The task function.
        *args: The arguments to the task function.
        **kwargs: The keyword arguments to the task function.

    Returns:
        A tuple of the task function, arguments and keyword arguments
        if the task should be submitted, or `None` if the task should
        not be submitted.
    """
    host_comm, worker_comm = self.create_comms()
    if self.parameter_name in kwargs:
        raise ValueError(
            f"Parameter {self.parameter_name} already exists in kwargs!",
        )

    kwargs[self.parameter_name] = worker_comm

    # We don't necessarily know if the future will be submitted. If so,
    # we will use this index later to retrieve the host_comm
    self.comms[worker_comm.id] = (host_comm, worker_comm)

    # Make sure to include the Comm
    return fn, args, kwargs

TimeoutError #

Bases: TimeoutError

A timeout error for communications.

close #

close(
    msg: Any | None = None,
    *,
    wait_for_ack: bool = False,
    okay_if_broken_pipe: bool = False,
    side: str = ""
) -> None

Close the connection.

PARAMETER DESCRIPTION
msg

The message to send to the other end of the connection.

TYPE: Any | None DEFAULT: None

wait_for_ack

If True, wait for an acknowledgement from the other end before closing the connection.

TYPE: bool DEFAULT: False

okay_if_broken_pipe

If True, will not log an error if the connection is already closed.

TYPE: bool DEFAULT: False

side

The side of the connection for naming purposes.

TYPE: str DEFAULT: ''

Source code in src/amltk/scheduling/plugins/comm.py
def close(  # noqa: PLR0912, C901
    self,
    msg: Any | None = None,
    *,
    wait_for_ack: bool = False,
    okay_if_broken_pipe: bool = False,
    side: str = "",
) -> None:
    """Close the connection.

    Args:
        msg: The message to send to the other end of the connection.
        wait_for_ack: If `True`, wait for an acknowledgement from the
            other end before closing the connection.
        okay_if_broken_pipe: If `True`, will not log an error if the
            connection is already closed.
        side: The side of the connection for naming purposes.
    """
    if not self.connection.closed:
        kind = Comm.Msg.Kind.CLOSE_WITH_ACK if wait_for_ack else Comm.Msg.Kind.CLOSE
        try:
            self._send_pipe((kind, msg))
        except BrokenPipeError as e:
            if not okay_if_broken_pipe:
                logger.error(f"{side} - Error sending close signal: {type(e)}{e}")
        except Exception as e:  # noqa: BLE001
            logger.error(f"{side} - Error sending close signal: {type(e)}{e}")
        else:
            if wait_for_ack:
                logger.debug(f"{side} - Waiting for ACK")
                try:
                    recieved_msg = self.connection.recv()
                except Exception as e:  # noqa: BLE001
                    logger.error(
                        f"{side} - Error waiting for ACK, closing: {type(e)}{e}",
                    )
                else:
                    match recieved_msg:
                        case Comm.Msg.Kind.WORKER_CLOSE_REQUEST:
                            logger.error(
                                f"{side} - Worker recieved request to close!",
                            )
                        case Comm.Msg.Kind.ACK:
                            logger.debug(f"{side} - Recieved ACK, closing")
                        case _:
                            logger.warning(
                                f"{side} - Expected ACK but {recieved_msg=}",
                            )
        finally:
            try:
                self.connection.close()
            except OSError:
                # It's possble that the connection was closed by the other end
                # before we could close it.
                pass
            except Exception as e:  # noqa: BLE001
                logger.error(f"{side} - Error closing connection: {type(e)}{e}")

create classmethod #

create(*, duplex: bool = True) -> tuple[Self, Self]

Create a pair of communication channels.

Wraps the output of multiprocessing.Pipe(duplex=duplex).

PARAMETER DESCRIPTION
duplex

Whether to allow for two-way communication

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
tuple[Self, Self]

A pair of communication channels.

Source code in src/amltk/scheduling/plugins/comm.py
@classmethod
def create(cls, *, duplex: bool = True) -> tuple[Self, Self]:
    """Create a pair of communication channels.

    Wraps the output of
    [`multiprocessing.Pipe(duplex=duplex)`][multiprocessing.Pipe].

    Args:
        duplex: Whether to allow for two-way communication

    Returns:
        A pair of communication channels.
    """
    reader, writer = Pipe(duplex=duplex)
    return cls(reader), cls(writer)

open #

open(
    opening_msg: Any | None = None,
    *,
    wait_for_ack: bool = False,
    side: str = "worker"
) -> Iterator[Self]

Open the connection.

PARAMETER DESCRIPTION
opening_msg

The message to send to the main process when the connection is opened.

TYPE: Any | None DEFAULT: None

wait_for_ack

If True, wait for an acknowledgement from the other end before closing the connection and exiting the context manager.

TYPE: bool DEFAULT: False

side

The side of the connection for naming purposes. Usually this is only done on the "worker" side.

TYPE: str DEFAULT: 'worker'

YIELDS DESCRIPTION
Self

The comm.

Source code in src/amltk/scheduling/plugins/comm.py
@contextmanager
def open(
    self,
    opening_msg: Any | None = None,
    *,
    wait_for_ack: bool = False,
    side: str = "worker",
) -> Iterator[Self]:
    """Open the connection.

    Args:
        opening_msg: The message to send to the main process
            when the connection is opened.
        wait_for_ack: If `True`, wait for an acknowledgement from the
            other end before closing the connection and exiting the
            context manager.
        side: The side of the connection for naming purposes.
            Usually this is only done on the `"worker"` side.

    Yields:
        The comm.
    """
    self._send_pipe((Comm.Msg.Kind.OPEN, opening_msg))
    yield self
    self.close(wait_for_ack=wait_for_ack, side=side)

request #

request(
    msg: Any | None = None, *, timeout: None | float = None
) -> Any

Receive a message.

PARAMETER DESCRIPTION
msg

The message to send to the other end of the connection. If left empty, will be None.

TYPE: Any | None DEFAULT: None

timeout

If float, will wait for that many seconds, raising an exception if exceeded. Otherwise, None will wait forever.

TYPE: None | float DEFAULT: None

RAISES DESCRIPTION
TimeoutError

If the timeout is reached.

CloseRequestError

If the other end needs to abruptly end and can not fufill the request. If thise error is thrown, the worker should finish as soon as possible.

RETURNS DESCRIPTION
Any

The received message or the default.

Source code in src/amltk/scheduling/plugins/comm.py
def request(
    self,
    msg: Any | None = None,
    *,
    timeout: None | float = None,
) -> Any:
    """Receive a message.

    Args:
        msg: The message to send to the other end of the connection.
            If left empty, will be `None`.
        timeout: If float, will wait for that many seconds, raising an exception
            if exceeded. Otherwise, None will wait forever.

    Raises:
        Comm.TimeoutError: If the timeout is reached.
        Comm.CloseRequestError: If the other end needs to abruptly end and
            can not fufill the request. If thise error is thrown, the worker
            should finish as soon as possible.

    Returns:
        The received message or the default.
    """
    self._send_pipe((Comm.Msg.Kind.REQUEST, msg))
    if not self.connection.poll(timeout):
        raise Comm.TimeoutError(f"Timed out waiting for response for {msg}")

    response = self.connection.recv()
    if response == Comm.Msg.Kind.WORKER_CLOSE_REQUEST:
        logger.error("Worker recieved request to close!")
        raise Comm.CloseRequestError()

    return response

send #

send(obj: Any) -> None

Send a message.

PARAMETER DESCRIPTION
obj

The object to send.

TYPE: Any

Source code in src/amltk/scheduling/plugins/comm.py
def send(self, obj: Any) -> None:
    """Send a message.

    Args:
        obj: The object to send.
    """
    self._send_pipe((Comm.Msg.Kind.MESSAGE, obj))