Skip to content

Queue monitor

amltk.scheduling.queue_monitor #

The queue monitoring.

QueueMonitor #

QueueMonitor(scheduler: Scheduler)

A monitor for the scheduler queue.

Source code in src/amltk/scheduling/queue_monitor.py
def __init__(self, scheduler: Scheduler) -> None:
    """Initializes the monitor."""
    super().__init__()
    self.scheduler = scheduler
    self.data: list[QueueMonitorRecord] = []

    scheduler.on_start(self.update)
    scheduler.on_finishing(self.update)
    scheduler.on_finished(self.update)
    scheduler.on_future_submitted(self.update)
    scheduler.on_future_cancelled(self.update)
    scheduler.on_future_done(self.update)

df #

df(*, n_workers: int | None = None) -> DataFrame

Converts the data to a pandas DataFrame.

PARAMETER DESCRIPTION
n_workers

The number of workers that were in use. This helps idenify how many workers were idle at a given time. If None, the maximum length of the queue at any recorded time is used.

TYPE: int | None DEFAULT: None

Source code in src/amltk/scheduling/queue_monitor.py
def df(
    self,
    *,
    n_workers: int | None = None,
) -> pd.DataFrame:
    """Converts the data to a pandas DataFrame.

    Args:
        n_workers: The number of workers that were in use. This helps idenify how
            many workers were idle at a given time. If None, the maximum length of
            the queue at any recorded time is used.
    """
    _df = pd.DataFrame(self.data, columns=list(QueueMonitorRecord._fields)).astype(
        {
            # Windows might have a weird default here but it should be 64 at least
            "time": "int64",
            "queue_size": int,
            "queued": int,
            "finished": int,
            "cancelled": int,
        },
    )
    if n_workers is None:
        n_workers = int(_df["queue_size"].max())
    _df["idle"] = n_workers - _df["queue_size"]
    _df["time"] = pd.to_datetime(_df["time"], unit="ns", origin="unix")
    return _df.set_index("time")

plot #

plot(
    *,
    ax: Axes | None = None,
    interval: tuple[int, UnitChoices] = (1, "s"),
    n_workers: int | None = None,
    **kwargs: Any
) -> Axes

Plots the data as a stacked barchart.

PARAMETER DESCRIPTION
ax

The axes to plot on. If None, a new figure is created.

TYPE: Axes | None DEFAULT: None

interval

The interval to use for the x-axis. The first value is the interval and the second value is the unit. Must be a valid pandas timedelta unit. See to_timedelta() for more information.

TYPE: tuple[int, UnitChoices] DEFAULT: (1, 's')

n_workers

The number of workers that were in use. This helps idenify how many workers were idle at a given time. If None, the maximum length of the queue at any recorded time is used.

TYPE: int | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the pandas plot function.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Axes

The axes.

Source code in src/amltk/scheduling/queue_monitor.py
def plot(
    self,
    *,
    ax: plt.Axes | None = None,
    interval: tuple[int, UnitChoices] = (1, "s"),
    n_workers: int | None = None,
    **kwargs: Any,
) -> plt.Axes:
    """Plots the data as a stacked barchart.

    Args:
        ax: The axes to plot on. If None, a new figure is created.
        interval: The interval to use for the x-axis. The first value is the
            interval and the second value is the unit. Must be a valid pandas
            timedelta unit. See [to_timedelta()][pandas.to_timedelta] for more
            information.
        n_workers: The number of workers that were in use. This helps idenify how
            many workers were idle at a given time. If None, the maximum length of
            the queue at any recorded time is used.
        **kwargs: Additional keyword arguments to pass to the pandas plot function.

    Returns:
        The axes.
    """
    if ax is None:
        _, _ax = plt.subplots(1, 1)
    else:
        _ax = ax

    _df = self.df(n_workers=n_workers)
    _df = _df.resample(f"{interval[0]}{interval[1]}").mean()
    _df.index = _df.index - _df.index[0]
    _reversed_df = _df[::-1]

    _reversed_df.plot.barh(
        stacked=True,
        y=["finished", "queued", "cancelled", "idle"],
        ax=_ax,
        width=1,
        edgecolor="k",
        **kwargs,
    )

    _ax.set_ylabel("Time")
    _ax.yaxis.set_major_locator(MaxNLocator(nbins="auto"))

    _ax.set_xlabel("Count")
    _ax.xaxis.set_major_locator(MaxNLocator(integer=True))

    return _ax

update #

update(*_: Any) -> None

Updates the data when the scheduler has an event.

Source code in src/amltk/scheduling/queue_monitor.py
def update(self, *_: Any) -> None:
    """Updates the data when the scheduler has an event."""
    queue = self.scheduler.queue
    # OPTIM: Not sure if this is fastenough
    counter = Counter([f._state for f in queue])
    self.data.append(
        QueueMonitorRecord(
            time.time_ns(),
            len(queue),
            counter["PENDING"],
            counter["FINISHED"],
            counter["CANCELLED"],
        ),
    )

QueueMonitorRecord #

Bases: NamedTuple

A record of the queue state at a given time.