Pynisher
The PynisherPlugin
uses pynisher to place memory, walltime
and cputime constraints on processes, crashing them if these limits are reached.
These default units are bytes ("B")
and seconds ("s")
but you can also use other
units, please see the relevant API doc.
It's best use is when used with
Scheduler.with_processes()
to have work
performed in processes.
Requirements
This required pynisher
which can be installed with:
Usage
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-timeout")
def callback(exception):
pass
@events
A Task timed out, either due to the wall time or cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-timeout")
def callback(exception):
pass
A Task was submitted but reached it's memory limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import numpy as np
def f(x: int) -> int:
x = np.arange(100000000)
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(memory_limit=(1, "KB")))
@task.on("pynisher-memory-limit")
def callback(exception):
pass
A Task was submitted but reached it's cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
i = 0
while True:
# Keep busying computing the answer to everything
i += 1
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(cputime_limit=(1, "s")))
@task.on("pynisher-cputime-limit")
def callback(exception):
pass
A Task was submitted but reached it's wall time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-walltime-limit")
def callback(exception):
pass
Scheduler Executor
This will place process limits on the task as soon as it starts
running, whever it may be running. If you are using
Scheduler.with_sequential()
then this will place limits on the main process, likely not what you
want. This also does not work with a
ThreadPoolExecutor
.
If using this with something like [dask-jobqueue
],
then this will place limits on the workers it spawns. It would be better
to place limits directly through dask job-queue then.
Platform Limitations (Mac, Windows)
Pynisher has some limitations with memory on Mac and Windows: automl/pynisher#features
You can check this with PynisherPlugin.supports("memory")
,
PynisherPlugin.supports("cpu_time")
and
PynisherPlugin.supports("wall_time")
.
See PynisherPlugin.supports()
class PynisherPlugin(*, memory_limit=None, cputime_limit=None, walltime_limit=None, context=None)
#
Bases: Plugin
A plugin that wraps a task in a pynisher to enforce limits on it.
This plugin wraps a task function in a Pynisher
instance to enforce
limits on the task. The limits are set by any of memory_limit=
,
cpu_time_limit=
and wall_time_limit=
.
Adds four new events to the task
ATTRIBUTE | DESCRIPTION |
---|---|
memory_limit |
The memory limit of the task.
|
cpu_time_limit |
The cpu time limit of the task.
|
wall_time_limit |
The wall time limit of the task.
|
PARAMETER | DESCRIPTION |
---|---|
memory_limit |
The memory limit to wrap the task in. Base unit is in bytes
but you can specify |
cputime_limit |
The cpu time limit to wrap the task in. Base unit is in
seconds but you can specify |
walltime_limit |
The wall time limit for the task. Base unit is in seconds
but you can specify |
context |
The context to use for multiprocessing. Defaults to
TYPE:
|
Source code in src/amltk/scheduling/plugins/pynisher.py
name: ClassVar
classvar
attr
#
The name of the plugin.
TIMEOUT: Event[PynisherPlugin.TimeoutException]
classvar
attr
#
A Task timed out, either due to the wall time or cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-timeout")
def callback(exception):
pass
MEMORY_LIMIT_REACHED: Event[pynisher.exceptions.MemoryLimitException]
classvar
attr
#
A Task was submitted but reached it's memory limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import numpy as np
def f(x: int) -> int:
x = np.arange(100000000)
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(memory_limit=(1, "KB")))
@task.on("pynisher-memory-limit")
def callback(exception):
pass
CPU_TIME_LIMIT_REACHED: Event[pynisher.exceptions.CpuTimeoutException]
classvar
attr
#
A Task was submitted but reached it's cpu time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
i = 0
while True:
# Keep busying computing the answer to everything
i += 1
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(cputime_limit=(1, "s")))
@task.on("pynisher-cputime-limit")
def callback(exception):
pass
WALL_TIME_LIMIT_REACHED: Event[pynisher.exceptions.WallTimeoutException]
classvar
attr
#
A Task was submitted but reached it's wall time limit.
Will call any subscribers with the exception as the argument.
from amltk.scheduling import Task, Scheduler
from amltk.scheduling.plugins.pynisher import PynisherPlugin
import time
def f(x: int) -> int:
time.sleep(x)
return 42
scheduler = Scheduler.with_processes()
task = scheduler.task(f, plugins=PynisherPlugin(walltime_limit=(1, "s")))
@task.on("pynisher-walltime-limit")
def callback(exception):
pass
TimeoutException: TypeAlias
classvar
attr
#
The exception that is raised when a task times out.
MemoryLimitException: TypeAlias
classvar
attr
#
The exception that is raised when a task reaches it's memory limit.
CpuTimeoutException: TypeAlias
classvar
attr
#
The exception that is raised when a task reaches it's cpu time limit.
WallTimeoutException: TypeAlias
classvar
attr
#
The exception that is raised when a task reaches it's wall time limit.
def pre_submit(fn, *args, **kwargs)
#
Wrap a task function in a Pynisher
instance.
Source code in src/amltk/scheduling/plugins/pynisher.py
def attach_task(task)
#
Attach the plugin to a task.
Source code in src/amltk/scheduling/plugins/pynisher.py
def copy()
#
Return a copy of the plugin.
Please see Plugin.copy()
.
Source code in src/amltk/scheduling/plugins/pynisher.py
def supports(kind)
classmethod
#
Check if the task is supported by the plugin.
PARAMETER | DESCRIPTION |
---|---|
kind |
The kind of limit to check.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
|