Skip to content

Queue monitor

A QueueMonitor is a monitor for the scheduler queue.

This module contains a monitor for the scheduler queue. The monitor tracks the queue state at every event emitted by the scheduler. The data can be converted to a pandas DataFrame or plotted as a stacked barchart.

Monitoring Frequency

To prevent repeated polling, we sample the scheduler queue at every scheduler event. This is because the queue is only modified upon one of these events. This means we don't need to poll the queue at a fixed interval. However, if you need more fine grained updates, you can add extra events/timings at which the monitor should update().

Performance impact

If your tasks and callbacks are very fast (~sub 10ms), then the monitor has a non-nelgible impact however for most use cases, this should not be a problem. As anything, you should profile how much work the scheduler can get done, with and without the monitor, to see if it is a problem for your use case.

In the below example, we have a very fast running function that runs on repeat, sometimes too fast for the scheduler to keep up, letting some futures buildup needing to be processed.

import time
import matplotlib.pyplot as plt
from amltk.scheduling import Scheduler
from amltk.scheduling.queue_monitor import QueueMonitor

def fast_function(x: int) -> int:
    return x + 1

N_WORKERS = 2
scheduler = Scheduler.with_processes(N_WORKERS)
monitor = QueueMonitor(scheduler)
task = scheduler.task(fast_function)

@scheduler.on_start(repeat=N_WORKERS)
def start():
    task.submit(1)

@task.on_result
def result(_, x: int):
    if scheduler.running():
        task.submit(x)

scheduler.run(timeout=1)
df = monitor.df()
print(df)
                               queue_size  queued  finished  cancelled  idle
time                                                                        
2024-01-10 15:20:47.403927183           0       0         0          0     2
2024-01-10 15:20:47.420675204           1       1         0          0     1
2024-01-10 15:20:47.421058229           2       2         0          0     0
2024-01-10 15:20:47.435153489           1       1         0          0     1
2024-01-10 15:20:47.435307026           2       2         0          0     0
...                                   ...     ...       ...        ...   ...
2024-01-10 15:20:48.420540169           2       2         0          0     0
2024-01-10 15:20:48.422375353           2       2         0          0     0
2024-01-10 15:20:48.435634147           2       2         0          0     0
2024-01-10 15:20:48.435850400           1       0         1          0     1
2024-01-10 15:20:48.435887670           0       0         0          0     2

[1809 rows x 5 columns]

We can also plot() the data as a stacked barchart with a set interval.

fig, ax = plt.subplots()
monitor.plot(interval=(50, "ms"))

2024-01-10T15:20:48.596390 image/svg+xml Matplotlib v3.8.2, https://matplotlib.org/

class QueueMonitorRecord #

Bases: NamedTuple

A record of the queue state at a given time.

class QueueMonitor(scheduler) #

A monitor for the scheduler queue.

Source code in src/amltk/scheduling/queue_monitor.py
def __init__(self, scheduler: Scheduler) -> None:
    """Initializes the monitor."""
    super().__init__()
    self.scheduler = scheduler
    self.data: list[QueueMonitorRecord] = []

    scheduler.on_start(self.update)
    scheduler.on_finishing(self.update)
    scheduler.on_finished(self.update)
    scheduler.on_future_submitted(self.update)
    scheduler.on_future_cancelled(self.update)
    scheduler.on_future_done(self.update)

def df(*, n_workers=None) #

Converts the data to a pandas DataFrame.

PARAMETER DESCRIPTION
n_workers

The number of workers that were in use. This helps idenify how many workers were idle at a given time. If None, the maximum length of the queue at any recorded time is used.

TYPE: int | None DEFAULT: None

Source code in src/amltk/scheduling/queue_monitor.py
def df(
    self,
    *,
    n_workers: int | None = None,
) -> pd.DataFrame:
    """Converts the data to a pandas DataFrame.

    Args:
        n_workers: The number of workers that were in use. This helps idenify how
            many workers were idle at a given time. If None, the maximum length of
            the queue at any recorded time is used.
    """
    _df = pd.DataFrame(self.data, columns=list(QueueMonitorRecord._fields)).astype(
        {
            # Windows might have a weird default here but it should be 64 at least
            "time": "int64",
            "queue_size": int,
            "queued": int,
            "finished": int,
            "cancelled": int,
        },
    )
    if n_workers is None:
        n_workers = int(_df["queue_size"].max())
    _df["idle"] = n_workers - _df["queue_size"]
    _df["time"] = pd.to_datetime(_df["time"], unit="ns", origin="unix")
    return _df.set_index("time")

def plot(*, ax=None, interval=(1, 's'), n_workers=None, **kwargs) #

Plots the data as a stacked barchart.

PARAMETER DESCRIPTION
ax

The axes to plot on. If None, a new figure is created.

TYPE: Axes | None DEFAULT: None

interval

The interval to use for the x-axis. The first value is the interval and the second value is the unit. Must be a valid pandas timedelta unit. See to_timedelta() for more information.

TYPE: tuple[int, UnitChoices] DEFAULT: (1, 's')

n_workers

The number of workers that were in use. This helps idenify how many workers were idle at a given time. If None, the maximum length of the queue at any recorded time is used.

TYPE: int | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the pandas plot function.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Axes

The axes.

Source code in src/amltk/scheduling/queue_monitor.py
def plot(
    self,
    *,
    ax: plt.Axes | None = None,
    interval: tuple[int, UnitChoices] = (1, "s"),
    n_workers: int | None = None,
    **kwargs: Any,
) -> plt.Axes:
    """Plots the data as a stacked barchart.

    Args:
        ax: The axes to plot on. If None, a new figure is created.
        interval: The interval to use for the x-axis. The first value is the
            interval and the second value is the unit. Must be a valid pandas
            timedelta unit. See [to_timedelta()][pandas.to_timedelta] for more
            information.
        n_workers: The number of workers that were in use. This helps idenify how
            many workers were idle at a given time. If None, the maximum length of
            the queue at any recorded time is used.
        **kwargs: Additional keyword arguments to pass to the pandas plot function.

    Returns:
        The axes.
    """
    if ax is None:
        _, _ax = plt.subplots(1, 1)
    else:
        _ax = ax

    _df = self.df(n_workers=n_workers)
    _df = _df.resample(f"{interval[0]}{interval[1]}").mean()
    _df.index = _df.index - _df.index[0]
    _reversed_df = _df[::-1]

    _reversed_df.plot.barh(
        stacked=True,
        y=["finished", "queued", "cancelled", "idle"],
        ax=_ax,
        width=1,
        edgecolor="k",
        **kwargs,
    )

    _ax.set_ylabel("Time")
    _ax.yaxis.set_major_locator(MaxNLocator(nbins="auto"))

    _ax.set_xlabel("Count")
    _ax.xaxis.set_major_locator(MaxNLocator(integer=True))

    return _ax

def update(*_) #

Updates the data when the scheduler has an event.

Source code in src/amltk/scheduling/queue_monitor.py
def update(self, *_: Any) -> None:
    """Updates the data when the scheduler has an event."""
    queue = self.scheduler.queue
    # OPTIM: Not sure if this is fastenough
    counter = Counter([f._state for f in queue])
    self.data.append(
        QueueMonitorRecord(
            time.time_ns(),
            len(queue),
            counter["PENDING"],
            counter["FINISHED"],
            counter["CANCELLED"],
        ),
    )