Queue monitor
A QueueMonitor
is a
monitor for the scheduler queue.
This module contains a monitor for the scheduler queue. The monitor tracks the queue state at every event emitted by the scheduler. The data can be converted to a pandas DataFrame or plotted as a stacked barchart.
Monitoring Frequency
To prevent repeated polling, we sample the scheduler queue at every scheduler event.
This is because the queue is only modified upon one of these events. This means we
don't need to poll the queue at a fixed interval. However, if you need more fine
grained updates, you can add extra events/timings at which the monitor should
update()
.
Performance impact
If your tasks and callbacks are very fast (~sub 10ms), then the monitor has a non-nelgible impact however for most use cases, this should not be a problem. As anything, you should profile how much work the scheduler can get done, with and without the monitor, to see if it is a problem for your use case.
In the below example, we have a very fast running function that runs on repeat, sometimes too fast for the scheduler to keep up, letting some futures buildup needing to be processed.
import time
import matplotlib.pyplot as plt
from amltk.scheduling import Scheduler
from amltk.scheduling.queue_monitor import QueueMonitor
def fast_function(x: int) -> int:
return x + 1
N_WORKERS = 2
scheduler = Scheduler.with_processes(N_WORKERS)
monitor = QueueMonitor(scheduler)
task = scheduler.task(fast_function)
@scheduler.on_start(repeat=N_WORKERS)
def start():
task.submit(1)
@task.on_result
def result(_, x: int):
if scheduler.running():
task.submit(x)
scheduler.run(timeout=1)
df = monitor.df()
print(df)
queue_size queued finished cancelled idle
time
2024-01-10 15:20:47.403927183 0 0 0 0 2
2024-01-10 15:20:47.420675204 1 1 0 0 1
2024-01-10 15:20:47.421058229 2 2 0 0 0
2024-01-10 15:20:47.435153489 1 1 0 0 1
2024-01-10 15:20:47.435307026 2 2 0 0 0
... ... ... ... ... ...
2024-01-10 15:20:48.420540169 2 2 0 0 0
2024-01-10 15:20:48.422375353 2 2 0 0 0
2024-01-10 15:20:48.435634147 2 2 0 0 0
2024-01-10 15:20:48.435850400 1 0 1 0 1
2024-01-10 15:20:48.435887670 0 0 0 0 2
[1809 rows x 5 columns]
We can also plot()
the data as a
stacked barchart with a set interval.
class QueueMonitorRecord
#
class QueueMonitor(scheduler)
#
A monitor for the scheduler queue.
Source code in src/amltk/scheduling/queue_monitor.py
def df(*, n_workers=None)
#
Converts the data to a pandas DataFrame.
PARAMETER | DESCRIPTION |
---|---|
n_workers |
The number of workers that were in use. This helps idenify how many workers were idle at a given time. If None, the maximum length of the queue at any recorded time is used.
TYPE:
|
Source code in src/amltk/scheduling/queue_monitor.py
def plot(*, ax=None, interval=(1, 's'), n_workers=None, **kwargs)
#
Plots the data as a stacked barchart.
PARAMETER | DESCRIPTION |
---|---|
ax |
The axes to plot on. If None, a new figure is created.
TYPE:
|
interval |
The interval to use for the x-axis. The first value is the interval and the second value is the unit. Must be a valid pandas timedelta unit. See to_timedelta() for more information. |
n_workers |
The number of workers that were in use. This helps idenify how many workers were idle at a given time. If None, the maximum length of the queue at any recorded time is used.
TYPE:
|
**kwargs |
Additional keyword arguments to pass to the pandas plot function.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Axes
|
The axes. |
Source code in src/amltk/scheduling/queue_monitor.py
def update(*_)
#
Updates the data when the scheduler has an event.