Skip to content

Pynisher

The PynisherPlugin uses pynisher to place memory, walltime and cputime constraints on processes, crashing them if these limits are reached. These default units are bytes ("B") and seconds ("s") but you can also use other units, please see the relevant API doc.

It's best use is when used with Scheduler.with_processes() to have work performed in processes.

Requirements

This required pynisher which can be installed with:

pip install amltk[pynisher]

# Or directly
pip install pynisher
Usage

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-timeout")
def callback(exception):
    pass

@events

A Task timed out, either due to the wall time or cpu time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-timeout")
def callback(exception):
    pass

A Task was submitted but reached it's memory limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import numpy as np

def f(x: int) -> int:
    x = np.arange(100000000)
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(memory_limit=(1, "KB")))

@task.on("pynisher-memory-limit")
def callback(exception):
    pass

A Task was submitted but reached it's cpu time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    i = 0
    while True:
        # Keep busying computing the answer to everything
        i += 1

    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(cputime_limit=(1, "s")))

@task.on("pynisher-cputime-limit")
def callback(exception):
    pass

A Task was submitted but reached it's wall time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-walltime-limit")
def callback(exception):
    pass

Scheduler Executor

This will place process limits on the task as soon as it starts running, whever it may be running. If you are using Scheduler.with_sequential() then this will place limits on the main process, likely not what you want. This also does not work with a ThreadPoolExecutor.

If using this with something like [dask-jobqueue], then this will place limits on the workers it spawns. It would be better to place limits directly through dask job-queue then.

Platform Limitations (Mac, Windows)

Pynisher has some limitations with memory on Mac and Windows: automl/pynisher#features

You can check this with PynisherPlugin.supports("memory"), PynisherPlugin.supports("cpu_time") and PynisherPlugin.supports("wall_time"). See PynisherPlugin.supports()

class PynisherPlugin(*, memory_limit=None, cputime_limit=None, walltime_limit=None, context=None) #

Bases: Plugin

A plugin that wraps a task in a pynisher to enforce limits on it.

This plugin wraps a task function in a Pynisher instance to enforce limits on the task. The limits are set by any of memory_limit=, cpu_time_limit= and wall_time_limit=.

Adds four new events to the task

ATTRIBUTE DESCRIPTION
memory_limit

The memory limit of the task.

cpu_time_limit

The cpu time limit of the task.

wall_time_limit

The wall time limit of the task.

PARAMETER DESCRIPTION
memory_limit

The memory limit to wrap the task in. Base unit is in bytes but you can specify (value, unit) where unit is one of ("B", "KB", "MB", "GB"). Defaults to None

TYPE: int | tuple[int, str] | None DEFAULT: None

cputime_limit

The cpu time limit to wrap the task in. Base unit is in seconds but you can specify (value, unit) where unit is one of ("s", "m", "h"). Defaults to None

TYPE: int | tuple[float, str] | None DEFAULT: None

walltime_limit

The wall time limit for the task. Base unit is in seconds but you can specify (value, unit) where unit is one of ("s", "m", "h"). Defaults to None.

TYPE: int | tuple[float, str] | None DEFAULT: None

context

The context to use for multiprocessing. Defaults to None. See multiprocessing.get_context()

TYPE: BaseContext | None DEFAULT: None

Source code in src/amltk/scheduling/plugins/pynisher.py
def __init__(
    self,
    *,
    memory_limit: int | tuple[int, str] | None = None,
    cputime_limit: int | tuple[float, str] | None = None,
    walltime_limit: int | tuple[float, str] | None = None,
    context: BaseContext | None = None,
):
    """Initialize a `PynisherPlugin` instance.

    Args:
        memory_limit: The memory limit to wrap the task in. Base unit is in bytes
            but you can specify `(value, unit)` where `unit` is one of
            `("B", "KB", "MB", "GB")`. Defaults to `None`
        cputime_limit: The cpu time limit to wrap the task in. Base unit is in
            seconds but you can specify `(value, unit)` where `unit` is one of
            `("s", "m", "h")`. Defaults to `None`
        walltime_limit: The wall time limit for the task. Base unit is in seconds
            but you can specify `(value, unit)` where `unit` is one of
            `("s", "m", "h")`. Defaults to `None`.
        context: The context to use for multiprocessing. Defaults to `None`.
            See [`multiprocessing.get_context()`][multiprocessing.get_context]
    """
    super().__init__()
    self.memory_limit = memory_limit
    self.cputime_limit = cputime_limit
    self.walltime_limit = walltime_limit
    self.context = context

    self.task: Task

name: ClassVar
classvar attr
#

The name of the plugin.

TIMEOUT: Event[PynisherPlugin.TimeoutException]
classvar attr
#

A Task timed out, either due to the wall time or cpu time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-timeout")
def callback(exception):
    pass

MEMORY_LIMIT_REACHED: Event[pynisher.exceptions.MemoryLimitException]
classvar attr
#

A Task was submitted but reached it's memory limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import numpy as np

def f(x: int) -> int:
    x = np.arange(100000000)
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(memory_limit=(1, "KB")))

@task.on("pynisher-memory-limit")
def callback(exception):
    pass

CPU_TIME_LIMIT_REACHED: Event[pynisher.exceptions.CpuTimeoutException]
classvar attr
#

A Task was submitted but reached it's cpu time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    i = 0
    while True:
        # Keep busying computing the answer to everything
        i += 1

    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(cputime_limit=(1, "s")))

@task.on("pynisher-cputime-limit")
def callback(exception):
    pass

WALL_TIME_LIMIT_REACHED: Event[pynisher.exceptions.WallTimeoutException]
classvar attr
#

A Task was submitted but reached it's wall time limit.

Will call any subscribers with the exception as the argument.

from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time

def f(x: int) -> int:
    time.sleep(x)
    return 42

scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))

@task.on("pynisher-walltime-limit")
def callback(exception):
    pass

TimeoutException: TypeAlias
classvar attr
#

The exception that is raised when a task times out.

MemoryLimitException: TypeAlias
classvar attr
#

The exception that is raised when a task reaches it's memory limit.

CpuTimeoutException: TypeAlias
classvar attr
#

The exception that is raised when a task reaches it's cpu time limit.

WallTimeoutException: TypeAlias
classvar attr
#

The exception that is raised when a task reaches it's wall time limit.

def pre_submit(fn, *args, **kwargs) #

Wrap a task function in a Pynisher instance.

Source code in src/amltk/scheduling/plugins/pynisher.py
@override
def pre_submit(
    self,
    fn: Callable[P, R],
    *args: P.args,
    **kwargs: P.kwargs,
) -> tuple[Callable[P, R], tuple, dict]:
    """Wrap a task function in a `Pynisher` instance."""
    # If any of our limits is set, we need to wrap it in Pynisher
    # to enfore these limits.
    if any(
        limit is not None
        for limit in (self.memory_limit, self.cputime_limit, self.walltime_limit)
    ):
        fn = pynisher.Pynisher(
            fn,
            memory=self.memory_limit,
            cpu_time=self.cputime_limit,
            wall_time=self.walltime_limit,
            terminate_child_processes=True,
            context=self.context,
        )

    return fn, args, kwargs

def attach_task(task) #

Attach the plugin to a task.

Source code in src/amltk/scheduling/plugins/pynisher.py
@override
def attach_task(self, task: Task) -> None:
    """Attach the plugin to a task."""
    self.task = task
    task.emitter.add_event(
        self.TIMEOUT,
        self.MEMORY_LIMIT_REACHED,
        self.CPU_TIME_LIMIT_REACHED,
        self.WALL_TIME_LIMIT_REACHED,
    )

    # Check the exception and emit pynisher specific ones too
    task.on_exception(self._check_to_emit_pynisher_exception, hidden=True)

def copy() #

Return a copy of the plugin.

Please see Plugin.copy().

Source code in src/amltk/scheduling/plugins/pynisher.py
@override
def copy(self) -> Self:
    """Return a copy of the plugin.

    Please see [`Plugin.copy()`][amltk.Plugin.copy].
    """
    return self.__class__(
        memory_limit=self.memory_limit,
        cputime_limit=self.cputime_limit,
        walltime_limit=self.walltime_limit,
    )

def supports(kind)
classmethod
#

Check if the task is supported by the plugin.

PARAMETER DESCRIPTION
kind

The kind of limit to check.

TYPE: Literal['wall_time', 'cpu_time', 'memory']

RETURNS DESCRIPTION
bool

True if the limit is supported by the plugin for your os, else False.

Source code in src/amltk/scheduling/plugins/pynisher.py
@classmethod
def supports(cls, kind: Literal["wall_time", "cpu_time", "memory"]) -> bool:
    """Check if the task is supported by the plugin.

    Args:
        kind: The kind of limit to check.

    Returns:
        `True` if the limit is supported by the plugin for your os, else `False`.
    """
    return pynisher.supports(kind)