Limiter
The Limiter
can limit the number of
times a function is called, how many concurrent instances of it can be running,
or whether it can run while another task is running.
The functionality of the Limiter
could also be implemented without a plugin but
it gives some nice utility.
Usage
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])
@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ @on_submitted │
│ @on_done │
│ @on_result │
│ @on_exception │
│ @on_cancelled │
│ @call-limit-reached │
│ └── def callback(task: 'Task', *args, **kwargs) │
│ @concurrent-limit-reached │
│ @disabled-due-to-running-task │
╰─────────────────────────────── Ref: LskBybJM ────────────────────────────────╯
@events
The event emitted when the task has reached its call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])
@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ @on_submitted │
│ @on_done │
│ @on_result │
│ @on_exception │
│ @on_cancelled │
│ @call-limit-reached │
│ └── def callback(task: 'Task', *args, **kwargs) │
│ @concurrent-limit-reached │
│ @disabled-due-to-running-task │
╰─────────────────────────────── Ref: ueTLcSTZ ────────────────────────────────╯
The event emitted when the task has reached its concurrent call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(2)
task = scheduler.task(fn, plugins=[Limiter(max_concurrent=2)])
@task.on("concurrent-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Concurrent 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ @on_submitted │
│ @on_done │
│ @on_result │
│ @on_exception │
│ @on_cancelled │
│ @call-limit-reached │
│ @concurrent-limit-reached │
│ └── def callback(task: 'Task', *args, **kwargs) │
│ @disabled-due-to-running-task │
╰─────────────────────────────── Ref: 1n6i8KRn ────────────────────────────────╯
The event emitter when the task was not submitted due to some other running task.
Will call any subscribers with the task as first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(2)
other_task = scheduler.task(fn)
task = scheduler.task(fn, plugins=[Limiter(not_while_running=other_task)])
@task.on("disabled-due-to-running-task")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Not While def fn(...) Ref: CRwpZHZ4 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ @on_submitted │
│ @on_done │
│ @on_result │
│ @on_exception │
│ @on_cancelled │
│ @call-limit-reached │
│ @concurrent-limit-reached │
│ @disabled-due-to-running-task │
│ └── def callback(task: 'Task', *args, **kwargs) │
╰─────────────────────────────── Ref: zrqIiWsJ ────────────────────────────────╯
class Limiter(*, max_calls=None, max_concurrent=None, not_while_running=None)
#
Bases: Plugin
A plugin that limits the submission of a task.
Adds three new events to the task:
PARAMETER | DESCRIPTION |
---|---|
max_calls |
The maximum number of calls to the task.
TYPE:
|
max_concurrent |
The maximum number of calls of this task that can be in the queue.
TYPE:
|
not_while_running |
A task or iterable of tasks that if active, will prevent this task from being submitted. |
Source code in src/amltk/scheduling/plugins/limiter.py
name: ClassVar
classvar
attr
#
The name of the plugin.
CALL_LIMIT_REACHED: Event[...]
classvar
attr
#
The event emitted when the task has reached its call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(1)
task = scheduler.task(fn, plugins=[Limiter(max_calls=2)])
@task.on("call-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Calls 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ @on_submitted │
│ @on_done │
│ @on_result │
│ @on_exception │
│ @on_cancelled │
│ @call-limit-reached │
│ └── def callback(task: 'Task', *args, **kwargs) │
│ @concurrent-limit-reached │
│ @disabled-due-to-running-task │
╰─────────────────────────────── Ref: C56pj7NZ ────────────────────────────────╯
CONCURRENT_LIMIT_REACHED: Event[...]
classvar
attr
#
The event emitted when the task has reached its concurrent call limit.
Will call any subscribers with the task as the first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(2)
task = scheduler.task(fn, plugins=[Limiter(max_concurrent=2)])
@task.on("concurrent-limit-reached")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Concurrent 0/2 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ @on_submitted │
│ @on_done │
│ @on_result │
│ @on_exception │
│ @on_cancelled │
│ @call-limit-reached │
│ @concurrent-limit-reached │
│ └── def callback(task: 'Task', *args, **kwargs) │
│ @disabled-due-to-running-task │
╰─────────────────────────────── Ref: zvsFNkZe ────────────────────────────────╯
DISABLED_DUE_TO_RUNNING_TASK: Event[...]
classvar
attr
#
The event emitter when the task was not submitted due to some other running task.
Will call any subscribers with the task as first argument, followed by the arguments and keyword arguments that were passed to the task.
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins import Limiter
def fn(x: int) -> int:
return x + 1
scheduler = Scheduler.with_processes(2)
other_task = scheduler.task(fn)
task = scheduler.task(fn, plugins=[Limiter(not_while_running=other_task)])
@task.on("disabled-due-to-running-task")
def callback(task: Task, *args, **kwargs):
pass
╭─ Task fn(x: 'int') -> 'int' ─────────────────────────────────────────────────╮
│ ╭───────────────────────────── Plugin limiter ─────────────────────────────╮ │
│ │ Not While def fn(...) Ref: U2ITvIkA │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ @on_submitted │
│ @on_done │
│ @on_result │
│ @on_exception │
│ @on_cancelled │
│ @call-limit-reached │
│ @concurrent-limit-reached │
│ @disabled-due-to-running-task │
│ └── def callback(task: 'Task', *args, **kwargs) │
╰─────────────────────────────── Ref: CveiNsYc ────────────────────────────────╯
n_running: int
prop
#
Return the number of running tasks.
def attach_task(task)
#
Attach the plugin to a task.
Source code in src/amltk/scheduling/plugins/limiter.py
def pre_submit(fn, *args, **kwargs)
#
Pre-submit hook.
Prevents submission of the task if it exceeds any of the set limits.