Skip to content

Emissions tracker plugin

amltk.scheduling.plugins.emissions_tracker_plugin #

Emissions Tracker Plugin Module.

This module defines a plugin for tracking carbon emissions using the codecarbon library.

For usage examples, refer to the docstring of the EmissionsTrackerPlugin class.

EmissionsTrackerPlugin #

EmissionsTrackerPlugin(*args: Any, **kwargs: Any)

Bases: Plugin

A plugin that tracks carbon emissions using codecarbon library.

Usage Example:

from concurrent.futures import ThreadPoolExecutor
from amltk.scheduling import Scheduler
from amltk.scheduling.plugins.emissions_tracker_plugin import EmissionsTrackerPlugin

def some_function(x: int) -> int:
    return x * 2

executor = ThreadPoolExecutor(max_workers=1)

# Create a Scheduler instance with the executor
scheduler = Scheduler(executor=executor)

# Create a task with the emissions tracker plugin
task = scheduler.task(some_function, plugins=[
    # Pass any codecarbon parameters as args here
    EmissionsTrackerPlugin(log_level="info", save_to_file=False)
])

@scheduler.on_start
def on_start():
    task.submit(5)  # Submit any args here

@task.on_submitted
def on_submitted(future, *args, **kwargs):
    print(f"Task was submitted", future, args, kwargs)

@task.on_done
def on_done(future):
    # Result is the return value of the function
    print("Task done: ", future.result())

scheduler.run()
PARAMETER DESCRIPTION
*args

Additional arguments to pass to codecarbon library.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments to pass to codecarbon library.

TYPE: Any DEFAULT: {}

You can pass any codecarbon parameters as args to EmissionsTrackerPlugin. Please refer to the official codecarbon documentation for more details: mlco2.github.io/codecarbon/parameters.html

Source code in src/amltk/scheduling/plugins/emissions_tracker_plugin.py
def __init__(self, *args: Any, **kwargs: Any):
    """Initialize the EmissionsTrackerPlugin.

    Args:
        *args: Additional arguments to pass to codecarbon library.
        **kwargs: Additional keyword arguments to pass to codecarbon library.

    You can pass any codecarbon parameters as args to EmissionsTrackerPlugin.
    Please refer to the official codecarbon documentation for more details:
    https://mlco2.github.io/codecarbon/parameters.html
    """
    super().__init__()
    self.task: Task | None = None
    self.codecarbon_args = args
    self.codecarbon_kwargs = kwargs

name class-attribute instance-attribute #

name: ClassVar = 'emissions-tracker'

The name of the plugin.

__rich__ #

__rich__() -> Panel

Return a rich panel.

Source code in src/amltk/scheduling/plugins/emissions_tracker_plugin.py
def __rich__(self) -> Panel:
    """Return a rich panel."""
    from rich.panel import Panel

    return Panel(
        f"codecarbon_args: {self.codecarbon_args} "
        f"codecarbon_kwargs: {self.codecarbon_kwargs}",
        title=f"Plugin {self.name}",
    )

attach_task #

attach_task(task: Task) -> None

Attach the plugin to a task.

Source code in src/amltk/scheduling/plugins/emissions_tracker_plugin.py
def attach_task(self, task: Task) -> None:
    """Attach the plugin to a task."""
    self.task = task

copy #

copy() -> Self

Return a copy of the plugin.

Source code in src/amltk/scheduling/plugins/emissions_tracker_plugin.py
def copy(self) -> Self:
    """Return a copy of the plugin."""
    return self.__class__(*self.codecarbon_args, **self.codecarbon_kwargs)

events #

events() -> list[Event]

Return a list of events that this plugin emits.

Likely no need to override this method, as it will automatically return all events defined on the plugin.

Source code in src/amltk/scheduling/plugins/plugin.py
def events(self) -> list[Event]:
    """Return a list of events that this plugin emits.

    Likely no need to override this method, as it will automatically
    return all events defined on the plugin.
    """
    inherited_attrs = chain.from_iterable(
        vars(cls).values() for cls in self.__class__.__mro__
    )
    return [attr for attr in inherited_attrs if isinstance(attr, Event)]

pre_submit #

pre_submit(
    fn: Callable[P, R], *args: args, **kwargs: kwargs
) -> tuple[Callable[P, R], tuple, dict]

Pre-submit hook.

Source code in src/amltk/scheduling/plugins/emissions_tracker_plugin.py
def pre_submit(
    self,
    fn: Callable[P, R],
    *args: P.args,
    **kwargs: P.kwargs,
) -> tuple[Callable[P, R], tuple, dict]:
    """Pre-submit hook."""
    wrapped_f = _EmissionsTrackerWrapper(
        fn,
        self.task,
        *self.codecarbon_args,
        **self.codecarbon_kwargs,
    )
    return wrapped_f, args, kwargs