Source code for deepcave.runs.handler

# Copyright 2021-2024 The DeepCAVE Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#  noqa: D400
"""
# Handler

This module provides utilities to handle a run.

It can retrieve working directories, run paths, run names, as well as groups of runs.
It provides utilities to update and remove runs as well as groups of runs.

# Classes
    - RunHandler: Handle the runs.
"""

from typing import Dict, List, Optional, Type, Union

import time
from pathlib import Path

from deepcave.config import Config
from deepcave.runs import AbstractRun
from deepcave.runs.group import Group
from deepcave.runs.run import Run
from deepcave.utils.cache import Cache
from deepcave.utils.logs import get_logger
from deepcave.utils.run_caches import RunCaches


[docs] class RunHandler: """ Handle the runs. Based on the meta data in the cache, automatically selects the right converter and switches to the right (plugin) cache. Provides utilities to retrieve working directories, run paths, run names, and groups of runs. Also update and remove runs as well a groups of runs. Properties ---------- c : Cache The cache containing information about a run(s). rc : RunCaches The caches for the selected runs. logger : Logger The logger for the run handler. available_run_yfes : List[Type[Run]] A list of the available converters. runs : Dict[str, AbstractRun] A dictionary of runs with their path as key. groups : Dict[str, Group] A dictionary of the groups. available_run_classes : List[Type[Run]] Contains the available run classes. """ def __init__(self, config: Config, cache: "Cache", run_cache: "RunCaches") -> None: self.c = cache self.rc = run_cache # Fields set by self.update() self.logger = get_logger("RunHandler") # Available converters self.available_run_classes: List[Type[Run]] = config.CONVERTERS # Internal state self.runs: Dict[str, AbstractRun] = {} # run_name -> Run self.groups: Dict[str, Group] = {} # group_name -> GroupedRun # Read from cache and update self.c.read() self.update_runs() self.update_groups()
[docs] def set_working_directory(self, working_directory: Union[Path, str]) -> None: """ Set the working directory to the meta cache. Parameters ---------- working_directory : Union[Path, str] Directory to be set. """ self.c.set("working_dir", value=str(working_directory))
[docs] def get_working_directory(self) -> Path: """ Return the current working directory in the cache. Returns ------- Path Path of the working directory. Raises ------ AssertionError If the working directory is not a string or a Path, an error is thrown. """ working_dir = self.c.get("working_dir") assert isinstance( working_dir, (str, Path) ), "Working directory of cache must be a string or a Path like." return Path(working_dir)
[docs] def get_available_run_paths(self) -> Dict[str, str]: """ Return the available run paths from the current directory. Returns ------- Dict[str, str] Run path as key and run name as value. Exceptions ---------- FileNotFoundError """ runs = {} working_dir = self.get_working_directory() try: for path in working_dir.iterdir(): run_name = path.stem # Ignore files and unwanted directories if path.is_file() or run_name[0] in [".", "_"]: continue runs[str(path)] = run_name # Sort run_names alphabetically runs = {k: v for k, v in sorted(runs.items(), key=lambda item: item[1])} except FileNotFoundError: pass return runs
[docs] def get_selected_run_paths(self) -> List[str]: """ Return the selected run paths from the cache. Returns ------- List[str] Run paths as a list. Raises ------ AssertionError. If the selected run paths are not a list, an error is thrown. """ selected_run_paths = self.c.get("selected_run_paths") assert isinstance( selected_run_paths, list ), "The selected run paths of the cache must be a list." return selected_run_paths
[docs] def get_selected_run_names(self) -> List[str]: """ Return the run names of the selected runs. Returns ------- List[str] List of run names of the selected runs. """ return [self.get_run_name(run_path) for run_path in self.runs.keys()]
[docs] def get_run_name(self, run_path: Union[Path, str]) -> str: """ Return the stem of the path. Parameters ---------- run_path : Union[Path, str] Path, which should be converted to a name. Returns ------- str Run name of the path. """ return Path(run_path).stem
[docs] def get_selected_groups(self) -> Dict[str, List[str]]: """ Get the selected groups. Returns ------- Dict[str, List[str]] Dictionary with the selected groups. Raises ------ AssertionError If groups in cache is not a dictionary, an error is thrown. """ selected_groups = self.c.get("groups") assert isinstance( selected_groups, dict ), "The groups aquired from the cache must be a dictionary." return selected_groups
[docs] def add_run(self, run_path: str) -> bool: """ Add a run path to the cache. If run path is already in cache, do nothing. Parameters ---------- run_path : str Path of a run. Returns ------- bool True if all selected runs could be loaded, False otherwise. """ selected_run_paths = self.get_selected_run_paths() if run_path not in selected_run_paths: selected_run_paths.append(run_path) self.c.set("selected_run_paths", value=selected_run_paths) return self.update_runs() return True
[docs] def remove_run(self, run_path: str) -> None: """ Remove a run path from the cache. If run path is not in cache, do nothing. Parameters ---------- run_path : str Path of a run. Raises ------ TypeError If `selected_run_paths` or `groups` is None, an error is thrown. """ selected_run_paths = self.c.get("selected_run_paths") if selected_run_paths is None: raise TypeError("Selected run paths can not be None.") if run_path in selected_run_paths: selected_run_paths.remove(run_path) self.c.set("selected_run_paths", value=selected_run_paths) # The groups have to be checked here because the removed run_path may # still be included groups = {} group_it = self.c.get("groups") if group_it is None: raise TypeError("Groups can not be None.") for group_name, run_paths in group_it.items(): if run_path in run_paths: run_paths.remove(run_path) groups[group_name] = run_paths self.c.set("groups", value=groups) # Last inputs are also removed here self.c.set("last_inputs", value={}) self.update_runs()
[docs] def update(self) -> None: """Update the internal run and group instances but only if a hash changed.""" update_required = False for run_path in list(self.runs.keys()): run = self.runs[run_path] # Get cache if self.rc.update(run): # It's important to delete the run from self.runs here because # otherwise this object is kept in memory though it has changed del self.runs[run_path] update_required = True if update_required: self.update_runs() self.update_groups()
[docs] def update_runs(self) -> bool: """ Load selected runs and update cache if files changed. Returns ------- bool True if all selected runs could be loaded, False otherwise. Raises ------ NotValidRunError If directory can not be transformed into a run, an error is thrown. """ runs: Dict[str, AbstractRun] = {} # run_path: Run success = True class_hint = None updated_paths = [] for run_path in self.get_selected_run_paths(): run = self.update_run(run_path, class_hint=class_hint) if run is not None: runs[run_path] = run class_hint = run.__class__ updated_paths += [run_path] else: success = False # Save in cache again if self.get_selected_run_paths() != updated_paths: self.c.set("selected_run_paths", value=updated_paths) # Save runs in memory self.runs = runs return success
[docs] def update_run( self, run_path: str, class_hint: Optional[Type[Run]] = None ) -> Optional[AbstractRun]: """ Load the run from `self.runs` or create a new one. Parameters ---------- run_path : str The path of the run. class_hint : Optional[Type[Run]], optional A hint/suggestion of what the Type of the Run is. Default is None. Returns ------- Optional[AbstractRun] The Run added to the cache. Raises ------ NotValidRunError If directory can not be transformed into a run, an error is thrown. """ # Try to get run from current runs if run_path in self.runs: run = self.runs[run_path] # Create cache file and set name/hash. Clear cache if hash got changed. self.rc.update(run) return run else: run = None self.logger.debug(f'Run "{Path(run_path).stem}" needs to be initialized.') # Load run if class_hint is not None: self.available_run_classes.remove(class_hint) self.available_run_classes.insert(0, class_hint) # Go through all converter classes found in the order of # how many runs have already been converted. exceptions = {} for run_class in self.available_run_classes: try: t1 = time.perf_counter() run = run_class.from_path(Path(run_path)) t2 = time.perf_counter() self.logger.debug( f'Run "{Path(run_path).stem}" was successfully loaded (took {round(t2 - t1, 2)}' f" seconds)." ) except KeyboardInterrupt: # Pass KeyboardInterrupt through try-except, so it can actually interrupt. raise except Exception as e: exceptions[run_class] = e # Run could not be loaded if run is None: self.logger.warning(f"Run {run_path} could not be loaded. Please check the logs.") # Print all exceptions for run_class, exception in exceptions.items(): self.logger.warning(f"{run_class.prefix}: {exception}.") else: # Add to run cache self.rc.update(run) return run
[docs] def update_groups(self, groups: Optional[Dict[str, List[str]]] = None) -> None: """ Load chosen groups. If `groups` is passed, it is used to instantiate the groups and saved to the cache. Otherwise, `groups` is loaded from the cache. Parameters ---------- groups : Optional[Dict[str, str]], optional A dictionary with the groups. Default is None. Raises ------ NotMergeableError If runs can not be merged, an error is thrown. TypeError If `groups` is None, an error is thrown. """ instantiated_groups = {} if groups is None: groups = self.c.get("groups") # This check is necessary because groups could still be None if groups is None: raise TypeError("Groups can not be None.") # Add grouped runs for group_name, run_paths in groups.items(): runs = [] for run_path, run in self.runs.items(): if run_path in run_paths: runs += [run] if len(runs) == 0: continue # Throws NotMergeableError instantiated_groups[group_name] = Group(group_name, runs) # Add groups to rc for group in instantiated_groups.values(): # Create cache file and set name/hash. Clear cache if hash got changed self.rc.update(group) # Save in cache self.c.set("groups", value=groups) # Save in memory self.groups = instantiated_groups
[docs] def get_run(self, run_id: str) -> AbstractRun: """ Look inside `self.runs` and `self.groups` and if the run id is found, returns the run. Parameters ---------- run_id : str Internal id of the run. Referred to `run.id`. Returns ------- AbstractRun Run. Raises ------ RuntimeError If `run_id` was not found in `self.runs` or `self.groups`. """ runs = self.get_runs(include_groups=True) for run in runs: if run.id == run_id: return run raise RuntimeError("Run not found.")
[docs] def get_groups(self) -> List[Group]: """ Return instantiated grouped runs. Returns ------- List[GroupedRun] Instances of grouped runs. """ self.update() return list(self.groups.values())
[docs] def get_runs(self, include_groups: bool = False) -> List[AbstractRun]: """ Return the runs from the internal cache. The runs are already loaded and ready to use. Optional, if `include_groups` is set to True, the groups are also included. Parameters ---------- include_groups : bool, optional Includes the groups, by default False. Returns ------- List[AbstractRun] Instances of runs. """ self.update() runs = list(self.runs.values()) if include_groups: runs += list(self.groups.values()) return runs