Source code for dacbench.logger

"""Logger helper."""

from __future__ import annotations

import json
from abc import ABCMeta, abstractmethod
from collections import ChainMap, defaultdict
from collections.abc import Callable, Iterable
from itertools import chain
from numbers import Number
from pathlib import Path
from typing import TYPE_CHECKING, Any

import numpy as np
import pandas as pd

if TYPE_CHECKING:
    from dacbench import AbstractBenchmark, AbstractEnv
    from dacbench.abstract_agent import AbstractDACBenchAgent


[docs] def load_logs(log_file: Path) -> list[dict]: """Loads the logs from a jsonl written by any logger. The result is the list of dicts in the format: { 'instance': 0, 'episode': 0, 'step': 1, 'example_log_val': { 'values': [val1, val2, ... valn], 'episode: [ep1, ep2, ..., epn], 'step': [step1, step2, ..., stepn], } ... }. Parameters ---------- log_file: pathlib.Path The path to the log file Returns: -------- [Dict, ...] """ with open(log_file) as f: return list(map(json.loads, f))
[docs] def split(predicate: Callable, iterable: Iterable) -> tuple[list, list]: """Splits the iterable into two list depending on the result of predicate. Parameters ---------- predicate: Callable A function taking an element of the iterable and return Ture or False iterable: Iterable the iterable to split Returns: -------- (positives, negatives) """ positives, negatives = [], [] for item in iterable: (positives if predicate(item) else negatives).append(item) return positives, negatives
[docs] def flatten_log_entry(log_entry: dict) -> list[dict]: """Transforms a log entry. From: { 'step': 0, 'episode': 2, 'some_value': { 'values' : [34, 45], } } To: [ { 'step': 0,'episode': 2, 'value': 34,}, { 'step': 0,'episode': 2, 'value': 45,} ]. Parameters ---------- log_entry: Dict A log entry """ dict_entries, top_level_entries = split( lambda item: isinstance(item[1], dict), log_entry.items() ) rows = [] for _value_name, value_dict in dict_entries: current_rows = dict([*top_level_entries, *value_dict]) rows.extend(map(dict, current_rows)) return rows
[docs] def list_to_tuple(list_: list) -> tuple: """Recursively transforms a list of lists into tuples of tuples. Parameters ---------- list_: (nested) list Returns: -------- (nested) tuple """ return tuple( list_to_tuple(item) if isinstance(item, list) else item for item in list_ )
[docs] def log2dataframe( logs: list[dict], drop_columns: list[str] | None = None ) -> pd.DataFrame: """Converts a list of log entries to a pandas dataframe. Usually used in combination with load_dataframe. Parameters ---------- logs: List List of log entries wide: bool wide=False (default) produces a dataframe with columns (episode, step, time, name, value) wide=True returns a dataframe (episode, step, time, name_1, name_2, ...) if the variable name_n has not been logged at (episode, step, time) name_n is NaN. drop_columns: List[str] List of column names to be dropped (before reshaping the long dataframe) mostly used in combination with wide=True to reduce NaN values Returns: -------- dataframe """ dataframe = pd.DataFrame(logs) if drop_columns is not None: dataframe = dataframe.drop(columns=drop_columns) for column in dataframe.columns: if "episode_" in column: dataframe = dataframe.join( dataframe.groupby("episode")[column].max(), on="episode", rsuffix="_new" ) dataframe = dataframe.drop(columns=column) dataframe = dataframe.rename(columns={f"{column}_new": column}) dataframe = dataframe.infer_objects() list_column_candidates = dataframe.dtypes == object for i, candidate in enumerate(list_column_candidates): if candidate: dataframe.iloc[:, i] = dataframe.iloc[:, i].apply( lambda x: list_to_tuple(x) if isinstance(x, list) else x ) return dataframe.infer_objects()
[docs] def seed_mapper(self): """Helper function for seeding.""" if self.env is None: return None return self.env.initial_seed
[docs] def instance_mapper(self): """Helper function to get instance id.""" if self.env is None: return None return self.env.get_inst_id()
[docs] class AbstractLogger(metaclass=ABCMeta): """Logger interface. The logger classes provide a way of writing structured logs as jsonl files and also help to track information like current episode, step, time ... In the jsonl log file each row corresponds to a step. """ valid_types = { # noqa: RUF012 "recursive": [dict, list, tuple, np.ndarray], "primitive": [str, int, float, bool, np.number], } def __init__( self, experiment_name: str, output_path: Path, step_write_frequency: int | None = None, episode_write_frequency: int = 1, ): """Initializes Logger. Parameters ---------- experiment_name: str Name of the folder to store the result in output_path: pathlib.Path Path under which the experiment folder is created step_write_frequency: int number of steps after which the loggers writes to file. If None only the data is only written to file if write is called, if triggered by episode_write_frequency or on close episode_write_frequency: int see step_write_frequency """ self.experiment_name = experiment_name self.output_path = output_path if isinstance(self.output_path, str): logpath = Path(self.output_path) / self.experiment_name self.log_dir = self._init_logging_dir(Path(logpath)) else: self.log_dir = self._init_logging_dir( self.output_path / self.experiment_name ) self.step_write_frequency = step_write_frequency self.episode_write_frequency = episode_write_frequency self._additional_info = {} self.additional_info_auto_mapper = { "instance": instance_mapper, "seed": seed_mapper, } self.env = None @property def additional_info(self): """Log additional info.""" additional_info = self._additional_info.copy() auto_info = { key: mapper(self) for key, mapper in self.additional_info_auto_mapper.items() if mapper(self) is not None } additional_info.update(auto_info) return additional_info
[docs] def set_env(self, env: AbstractEnv) -> None: """Needed to infer automatically logged information like the instance id. Parameters ---------- env: AbstractEnv env to log """ self.env = env
@staticmethod def _pretty_valid_types() -> str: """Returns a string pretty string representation of the types that can be logged as values. """ valid_types = chain( AbstractLogger.valid_types["recursive"], AbstractLogger.valid_types["primitive"], ) return ", ".join(type_.__name__ for type_ in valid_types) @staticmethod def _init_logging_dir(log_dir: Path) -> None: """Prepares the logging directory. Parameters ---------- log_dir: pathlib.Path dir to prepare for logging Returns: -------- None """ log_dir.mkdir(parents=True, exist_ok=True) return log_dir
[docs] def is_of_valid_type(self, value: Any) -> bool: """Checks if the value of any type in the logger's valid types. Parameters ---------- value value to check Returns: -------- bool """ if any(isinstance(value, v_type) for v_type in self.valid_types["primitive"]): return True if any(isinstance(value, v_type) for v_type in self.valid_types["recursive"]): value = value.vlaues() if isinstance(value, dict) else value return all(self.is_of_valid_type(sub_value) for sub_value in value) return False
[docs] @abstractmethod def close(self) -> None: """Makes sure, that all remaining entries in the are written to file and the file is closed. """
[docs] @abstractmethod def next_step(self) -> None: """Call at the end of the step. Updates the internal state and dumps the information of the last step into a json. """
[docs] @abstractmethod def next_episode(self) -> None: """Call at the end of episode. See next_step."""
[docs] @abstractmethod def write(self) -> None: """Writes buffered logs to file. Invoke manually if you want to load logs during a run. """
[docs] @abstractmethod def log(self, key: str, value) -> None: """Writes value to list of values and save the current time for key. Parameters ---------- key: str key to log value: the value must of of a type that is json serializable. Currently only {str, int, float, bool, np.number} and recursive types of those are supported. """
[docs] @abstractmethod def log_dict(self, data): """Alternative to log if more the one value should be logged at once. Parameters ---------- data: dict a dict with key-value so that each value is a valid value for log """
[docs] @abstractmethod def log_space(self, key: str, value: np.ndarray | dict, space_info=None): """Special for logging gym.spaces. Currently three types are supported: * Numbers: e.g. samples from Discrete * Fixed length arrays like MultiDiscrete or Box * Dict: assuming each key has fixed length array Parameters ---------- key: see log value: see log space_info: a list of column names. The length of this list must equal the resulting number of columns. """
[docs] class ModuleLogger(AbstractLogger): """A logger for handling logging of one module. e.g. a wrapper or toplevel general logging. Don't create manually use Logger to manage ModuleLoggers """ def __init__( self, output_path: Path, experiment_name: str, module: str, step_write_frequency: int | None = None, episode_write_frequency: int = 1, ) -> None: """All results are placed under 'output_path / experiment_name'. Parameters ---------- experiment_name: str Name of the folder to store the result in output_path: pathlib.Path Path under which the experiment folder is created module: str the module (mostly name of the wrapper), each wrapper gets its own file step_write_frequency: int number of steps after which the loggers writes to file. If None only the data is only written to file if write is called, if triggered by episode_write_frequency or on close episode_write_frequency: int see step_write_frequency output_path: The path where logged information should be stored """ super().__init__( experiment_name, output_path, step_write_frequency, episode_write_frequency ) self.log_file = open(Path(self.log_dir) / f"{module}.jsonl", "w") self.step = 0 self.episode = 0 self.buffer = [] self.current_step = self.__init_dict()
[docs] def get_logfile(self) -> Path: """Get logfile name. Returns: -------- pathlib.Path: the path to the log file of this logger """ return Path(self.log_file.name)
[docs] def close(self): """Makes sure, that all remaining entries in the are written to file and the file is closed. """ if not self.log_file.closed: self.write() self.log_file.close()
[docs] def __del__(self): """Makes sure, that all remaining entries in the are written to file and the file is closed. """ if not self.log_file.closed: self.close()
@staticmethod def __json_default(obj): """Add supoort for dumping numpy arrays and numbers to json. Parameters ---------- object numpy object to jsonify """ if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, np.number): return obj.item() raise ValueError(f"Type {type(obj)} not supported") def __end_step(self): if self.current_step: self.current_step["step"] = self.step self.current_step["episode"] = self.episode self.current_step.update(self.additional_info) self.buffer.append( json.dumps(self.current_step, default=self.__json_default) ) self.current_step = self.__init_dict() @staticmethod def __init_dict(): return defaultdict(dict)
[docs] def reset_episode(self) -> None: """Resets the episode and step. Be aware that this can lead to ambitious keys if no instance or seed or other identifying additional info is set. """ self.__end_step() self.episode = 0 self.step = 0
def __reset_step(self): self.__end_step() self.step = 0
[docs] def next_step(self): """Call at the end of the step. Updates the internal state and dumps the information of the last step into a json. """ self.__end_step() if ( self.step_write_frequency is not None and self.step % self.step_write_frequency == 0 ): self.write() self.step += 1
[docs] def next_episode(self): """Writes buffered logs to file. Invoke manually if you want to load logs during a run. """ self.__reset_step() if ( self.episode_write_frequency is not None and self.episode % self.episode_write_frequency == 0 ): self.write() self.episode += 1
[docs] def write(self): """Writes buffered logs to file. Invoke manually if you want to load logs during a run. """ self.__end_step() self.__buffer_to_file()
def __buffer_to_file(self): if len(self.buffer) > 0: self.log_file.write("\n".join(self.buffer)) self.log_file.write("\n") self.buffer.clear() self.log_file.flush()
[docs] def set_additional_info(self, **kwargs): """Can be used to log additional information for each step e.g. for seed and instance id. Parameters ---------- kwargs : dict info dict """ self._additional_info.update(kwargs)
[docs] def log( self, key: str, value: dict | list | tuple | str | int | float | bool ) -> None: """Writes value to list of values and save the current time for key. Parameters ---------- key: str key to log value: the value must of of a type that is json serializable. Currently only {str, int, float, bool, np.number} and recursive types of those are supported. """ if not self.is_of_valid_type(value): valid_types = self._pretty_valid_types() raise ValueError( f"value {type(value)} is not of valid type or a recursive composition" f"of valid types ({valid_types})" ) self.current_step[key] = value
[docs] def log_dict(self, data: dict) -> None: """Alternative to log if more the one value should be logged at once. Parameters ---------- data: dict a dict with key-value so that each value is a valid value for log """ for key, value in data.items(): self.log(key, value)
@staticmethod def __space_dict(key: str, value, space_info): if isinstance(value, np.ndarray) and len(value.shape) == 0: value = value.item() if isinstance(value, Number): if space_info is None: data = {key: value} else: if len(space_info) != 1: raise ValueError( "Space info must match length " f"(expect 1 != got{len(space_info)}" ) data = {f"{key}_{space_info[0]}": value} elif isinstance(value, np.ndarray): if space_info is not None and len(space_info) != len(value): raise ValueError( f"Space info must match length (expect {len(value)} " f"!= got{len(space_info)}" ) key_suffix = ( enumerate(value) if space_info is None else zip(space_info, value, strict=False) ) data = {f"{key}_{suffix}": x for suffix, x in key_suffix} elif isinstance(value, dict): key_suffix = ( value.items() if space_info is None else zip(space_info, value.values(), strict=False) ) dicts = ( ModuleLogger.__space_dict(f"{sub_key}", sub_value, None) for sub_key, sub_value in key_suffix ) data = dict(ChainMap(*dicts)) else: raise ValueError("Space does not seem be supported") return data
[docs] def log_space(self, key, value, space_info=None): """Special for logging gym.spaces. Currently three types are supported: * Numbers: e.g. samples from Discrete * Fixed length arrays like MultiDiscrete or Box * Dict: assuming each key has fixed length array Parameters ---------- key: see log value: see log space_info: a list of column names. The length of this list must equal the resulting number of columns. """ data = self.__space_dict(key, value, space_info) self.log_dict(data)
[docs] class Logger(AbstractLogger): """A logger that manages the creation of the module loggers. To get a ModuleLogger for you module (e.g. wrapper) call module_logger = Logger(...).add_module("my_wrapper"). From now on module_logger.log(...) or logger.log(..., module="my_wrapper") can be used to log. The logger module takes care of updating information like episode and step in the subloggers. To indicate to the loggers the end of the episode or the next_step simple call logger.next_episode() or logger.next_step(). """ def __init__( self, experiment_name: str, output_path: Path, step_write_frequency: int | None = None, episode_write_frequency: int = 1, ) -> None: """Create Logger. Parameters ---------- experiment_name: str Name of the folder to store the result in output_path: pathlib.Path Path under which the experiment folder is created step_write_frequency: int number of steps after which the loggers writes to file. If None only the data is only written to file if write is called, if triggered by episode_write_frequency or on close episode_write_frequency: int see step_write_frequency """ super().__init__( experiment_name, output_path, step_write_frequency, episode_write_frequency ) self.env: AbstractEnv = None self.module_logger: dict[str, ModuleLogger] = {}
[docs] def set_env(self, env: AbstractEnv) -> None: """Writes information about the environment. Parameters ---------- env: AbstractEnv the env object to track """ super().set_env(env) for _, module_logger in self.module_logger.items(): module_logger.set_env(env)
[docs] def close(self): """Makes sure, that all remaining entries (from all sublogger) are written to files and the files are closed. """ for _, module_logger in self.module_logger.items(): module_logger.close()
[docs] def __del__(self): """Removes Logger.""" self.close()
[docs] def next_step(self): """Call at the end of the step. Updates the internal state of all subloggers and dumps the information of the last step into a json. """ for _, module_logger in self.module_logger.items(): module_logger.next_step()
[docs] def next_episode(self): """Call at the end of episode. See next_step.""" for _, module_logger in self.module_logger.items(): module_logger.next_episode()
[docs] def reset_episode(self): """Resets in all modules.""" for _, module_logger in self.module_logger.items(): module_logger.reset_episode()
[docs] def write(self): """Writes buffered logs to file. Invoke manually if you want to load logs during a run. """ for _, module_logger in self.module_logger.items(): module_logger.write()
[docs] def add_module(self, module: str | type) -> ModuleLogger: """Creates a sub-logger. For more details see class level documentation. Parameters ---------- module: str or type The module name or Wrapper-Type to create a sub-logger for Returns: -------- ModuleLogger """ if isinstance(module, str): pass elif isinstance(module, type): module = module.__name__ else: module = module.__class__ if module in self.module_logger: raise ValueError(f"Module {module} already registered") self.module_logger[module] = ModuleLogger( self.output_path, self.experiment_name, module, self.step_write_frequency, self.episode_write_frequency, ) if self.env is not None: self.module_logger[module].set_env(self.env) return self.module_logger[module]
[docs] def add_agent(self, agent: AbstractDACBenchAgent): """Writes information about the agent. Parameters ---------- agent: AbstractDACBenchAgent the agent object to add """ agent_config = {"type": str(agent.__class__)} with open(Path(self.log_dir) / "agent.json", "w") as f: json.dump(agent_config, f)
[docs] def add_benchmark(self, benchmark: AbstractBenchmark) -> None: """Add benchmark to logger. Parameters ---------- benchmark : AbstractBenchmark the benchmark object to add """ benchmark.save_config(Path(self.log_dir) / "benchmark.json")
[docs] def set_additional_info(self, **kwargs): """Add additional info. Parameters ---------- kwargs : dict info dict """ for _, module_logger in self.module_logger.items(): module_logger.set_additional_info(**kwargs)
[docs] def log(self, key, value, module): """Log a key-value pair to module. Parameters ---------- key : str | int key to log value : value to log module : module to log to """ if module not in self.module_logger: raise ValueError(f"Module {module} not registered yet") self.module_logger.log(key, value)
[docs] def log_space(self, key, value, module, space_info=None): """Log a key-value pair to module with optional info. Parameters ---------- key : str | int key to log value : value to log module : module to log to space_info : additional log info """ if module not in self.module_logger: raise ValueError(f"Module {module} not registered yet") self.module_logger.log_space(key, value, space_info)
[docs] def log_dict(self, data, module): """Log a data dict to module. Parameters ---------- data : dict data to log module module to log to """ if module not in self.module_logger: raise ValueError(f"Module {module} not registered yet") self.module_logger.log_space(data)