Skip to content

Task

Tasks#

amltk.scheduling.task #

A Task is a unit of work that can be scheduled by the Scheduler.

It is defined by its function= to call. Whenever a Task has its submit() method called, the function will be dispatched to run by a Scheduler.

When a task has returned, either successfully, or with an exception, it will emit @events to indicate so. You can subscribe to these events with callbacks and act accordingly.

@events

Check out the @events reference for more on how to customize these callbacks. You can also take a look at the API of on() for more information.

amltk.scheduling.task.Task.on_result instance-attribute #

on_result: Subscriber[[Future[R], R], Any] = subscriber(
    RESULT
)

Called when a task has successfully returned a value. Comes with Future

@task.on_result
def on_result(future: Future[R], result: R):
    print(f"Future {future} returned {result}")

amltk.scheduling.task.Task.on_exception instance-attribute #

on_exception: Subscriber[
    [Future[R], BaseException], Any
] = subscriber(EXCEPTION)

Called when a task failed to return anything but an exception. Comes with Future

@task.on_exception
def on_exception(future: Future[R], error: BaseException):
    print(f"Future {future} exceptioned {error}")

amltk.scheduling.task.Task.on_done instance-attribute #

on_done: Subscriber[[Future[R]], Any] = subscriber(DONE)

Called when a task is done running with a result or exception.

@task.on_done
def on_done(future: Future[R]):
    print(f"Future {future} is done")

amltk.scheduling.task.Task.on_submitted instance-attribute #

on_submitted: Subscriber[
    Concatenate[Future[R], P], Any
] = subscriber(SUBMITTED)

An event that is emitted when a future is submitted to the scheduler. It will pass the future as the first argument with args and kwargs following.

This is done before any callbacks are attached to the future.

@task.on_submitted
def on_submitted(future: Future[R], *args, **kwargs):
    print(f"Future {future} was submitted with {args=} and {kwargs=}")

amltk.scheduling.task.Task.on_cancelled instance-attribute #

on_cancelled: Subscriber[[Future[R]], Any] = subscriber(
    CANCELLED
)

Called when a task is cancelled.

@task.on_cancelled
def on_cancelled(future: Future[R]):
    print(f"Future {future} was cancelled")

Usage

The usual way to create a task is with Scheduler.task(), where you provide the function= to call.

from amltk import Scheduler

def f(x: int) -> int:
    return x * 2

scheduler = Scheduler.with_processes(2)
task = scheduler.task(f)

@scheduler.on_start
def on_start():
    task.submit(1)

@task.on_result
def on_result(future: Future[int], result: int):
    print(f"Task {future} returned {result}")

scheduler.run()

Task returned 2

╭─ Scheduler ──────────────────────────────────────────────────────────────────╮
  Executor                  Queue: (0)                                        
  ╭─ ProcessPoolExecutor─╮                                                    
{'max_workers': 2}
  ╰──────────────────────╯                                                    
                                                                              
                                                                              
                                                                              
╰──────────────────────────────────────────────────────────────────────────────╯
┣━━ @on_empty 1
@on_start 1
└── def on_start() (1)
@on_finishing 1
@on_finished 1
@on_stop
@on_timeout
@on_future_submitted 1
@on_future_done 1
@on_future_cancelled
@on_future_exception
@on_future_result 1
┗━━ ╭─ Task f(x: 'int') -> 'int' ──────────────────────────────────────────────╮
    ╰────────────────────────── Ref: Task-f-sB2WvkeJ ──────────────────────────╯

If you'd like to simply just call the original function, without submitting it to the scheduler, you can always just call the task directly, i.e. task(1).

You can also provide Plugins to the task, to modify tasks, add functionality and add new events.