# Copyright 2021-2024 The DeepCAVE Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# noqa: D400
"""
# Group
This module provides utilities for grouping and managing a group of runs.
Utilities include getting attributes of the grouped runs, as well as the group itself.
## Classes
- Group: Can group and manage a group of runs.
"""
from typing import Any, Dict, Iterator, List, Optional, Tuple
from copy import deepcopy
import numpy as np
from ConfigSpace.configuration_space import Configuration
from ConfigSpace.hyperparameters.hp_components import ROUND_PLACES
from deepcave.runs import AbstractRun, NotMergeableError, check_equality
from deepcave.utils.hash import string_to_hash
from deepcave.utils.util import config_to_tuple
[docs]
class Group(AbstractRun):
"""
Can group and manage a group of runs.
Utilities include getting attributes of the grouped runs, as well as the group itself.
Properties
----------
runs : List[AbstractRun]
A list of the runs.
meta : Dict[str, Any]
Contains budgets, objectives and their attributes.
configspace : ConfigurationSpace
The configuration space of the runs.
objectives : Objective
The objectives of the runs.
budgets : List[Union[int, float]]
The budgets of the runs.
configs : Dict[int, Any]
A dictionary of the configurations and their ids as key.
origins : Dict[int, str]
The origins of the configurations and their ids as key.
trial_keys : Dict[Tuple[str, int], int]
The keys of the trial.
history : List[Trial]
The trial history.
prefix : str
The prefix for the id of the group.
name : str
The name for the id of the group.
"""
prefix = "group"
def __init__(self, name: str, runs: List[AbstractRun]):
super(Group, self).__init__(name)
self.runs = [run for run in runs if run is not None] # Filter for Nones
self.reset()
if len(self.runs) == 0:
return
try:
attributes = check_equality(self.runs)
# abstract run requires meta to contain budgets / objectives
self.meta = {
"budgets": attributes["budgets"],
"objectives": attributes["objectives"],
}
self.meta["seeds"] = list(
set([seed for run in self.runs for seed in run.meta["seeds"].copy()])
)
self.configspace = attributes["configspace"]
self.objectives = attributes["objectives"]
self.budgets = attributes["budgets"]
self.seeds = self.meta["seeds"]
# New config ids are needed
current_config_id = 0
# Key: new_config_id; Value: (run_id, config_id)
self._original_config_mapping: Dict[int, Tuple[int, int]] = {}
# Key: (run_id, config_id); Value: new_config_id
self._new_config_mapping: Dict[Tuple[int, int], int] = {}
# Combine runs here
for run_id, run in enumerate(self.runs):
config_mapping: Dict[int, int] = {} # Maps old ones to the new ones
# Update configs + origins
for config_id in run.configs.keys():
config = run.configs[config_id]
origin = run.origins[config_id]
for added_config_id, added_config in self.configs.items():
if config == added_config:
config_mapping[config_id] = added_config_id
break
if isinstance(config, Configuration):
config = dict(config)
if config_id not in config_mapping:
self.configs[current_config_id] = config
self.origins[current_config_id] = origin
# Use same rounding as ConfigSpace does
self.config_id_mapping[
config_to_tuple(config, ROUND_PLACES)
] = current_config_id
config_mapping[config_id] = current_config_id
current_config_id += 1
# Update history + trial_keys
for trial in run.history:
# Deep copy trial
trial = deepcopy(trial)
(config_id, budget, seed) = trial.get_key()
# Config id might have changed
new_config_id = config_mapping[config_id]
# Update config id
trial.config_id = new_config_id
# Now it is added to the history
trial_key = trial.get_key()
if trial_key not in self.trial_keys:
self.trial_keys[trial_key] = len(self.history)
self.history += [trial]
else:
self.history[self.trial_keys[trial_key]] = trial
# Get model mapping done
self._original_config_mapping[new_config_id] = (run_id, config_id)
self._new_config_mapping[(run_id, config_id)] = new_config_id
# And update highest budget
self._update_highest_budget(new_config_id, trial.budget, trial.status)
except Exception as e:
raise NotMergeableError(f"Runs can not be merged: {e}")
[docs]
def __iter__(self: "Group") -> Iterator[str]:
"""Allow to iterate over the object."""
for run in self.runs:
yield run.name
@property
def hash(self) -> str:
"""
Sorted hashes of the group.
Returns
-------
str
The sorted hash of the group.
"""
hashes = []
for run in self.runs:
hashes += [run.hash]
# Hashes are sorted now, so there is no dependence on the order
hashes = sorted(hashes)
return string_to_hash("-".join(hashes))
@property
def id(self) -> str:
"""
Get the hash as id of the group.
In contrast to hash, this hash should not be changed throughout the run.
Returns
-------
str
The hash of the group.
"""
# Groups do not have a path, therefore the name is used.
return string_to_hash(f"{self.prefix}:{self.name}")
@property
def latest_change(self) -> float:
"""
Get the latest change made to the grouped runs.
Returns
-------
float
The latest change.
"""
latest_change = 0.0
for run in self.runs:
if run.latest_change > latest_change:
latest_change = run.latest_change
return latest_change
@property
def run_paths(self) -> List[str]:
"""Get the path of the runs in the group."""
return [str(run.path) for run in self.runs]
@property
def run_names(self) -> List[str]:
"""
Get the names of the runs in the group.
Returns
-------
List[str]
A list of the names of the runs in the group.
"""
return [run.name for run in self.runs]
[docs]
def get_runs(self) -> List[AbstractRun]:
"""
Get the runs in the group.
Returns
-------
List[AbstractRun]
A list of the grouped runs.
"""
return self.runs
[docs]
def get_new_config_id(self, run_id: int, original_config_id: int) -> int:
"""
Get a new identificator for a configuration.
Parameters
----------
run_id : int
The id of the run.
original_config_id : int
The original identificator of a configuration.
Returns
-------
int
The new identificator of a configuration.
"""
return self._new_config_mapping[(run_id, original_config_id)]
[docs]
def get_original_config_id(self, config_id: int) -> int:
"""
Get the original identificator of a configuration.
Parameters
----------
config_id : int
The identificator of a configuration.
Returns
-------
int
The original identificator of a configuration.
"""
return self._original_config_mapping[config_id][1]
[docs]
def get_original_run(self, config_id: int) -> AbstractRun:
"""
Get the original run.
Parameters
----------
config_id : int
The identificator of the configuration.
Returns
-------
AbstractRun
The original run.
"""
run_id = self._original_config_mapping[config_id][0]
return self.runs[run_id]
[docs]
def get_model(self, config_id: int) -> Optional[Any]:
"""
Get the model given the configuration id.
Parameters
----------
config_id : int
The identificator of the configuration.
Returns
-------
Optional[Any]
The model.
"""
run_id, config_id = self._original_config_mapping[config_id]
return self.runs[run_id].get_model(config_id)
# Types dont match superclass
[docs]
def get_trajectory(self, *args, **kwargs): # type: ignore
"""
Calculate the trajectory of the given objective and budget.
This includes the times, the mean costs, and the standard deviation of the costs.
Parameters
----------
*args
Should be the objective to calculate the trajectory from.
**kwargs
Should be the budget to calculate the trajectory for.
Returns
-------
times : List[float]
Times of the trajectory.
costs_mean : List[float]
Costs of the trajectory.
costs_std : List[float]
Standard deviation of the costs of the trajectory.
ids : List[int]
The "global" ids of the selected trial.
config_ids : List[int]
The configuration ids of the selected trials.
"""
# Cache costs
run_costs = []
run_times = []
# All x values on which y values are needed
all_times = []
for _, run in enumerate(self.runs):
times, costs_mean, _, _, _ = run.get_trajectory(*args, **kwargs)
# Cache s.t. calculate it is not calculated multiple times
run_costs.append(costs_mean)
run_times.append(times)
# Add all times
# Standard deviation needs to be calculated on all times
for time in times:
if time not in all_times:
all_times.append(time)
all_times.sort()
# Now look for corresponding y values
all_costs = []
for time in all_times:
y = []
# Iterate over all runs
for costs, times in zip(run_costs, run_times):
# Find closest x value
idx = min(range(len(times)), key=lambda i: abs(times[i] - time))
y.append(costs[idx])
all_costs.append(y)
# Make numpy arrays
all_costs_array = np.array(all_costs)
times = all_times
costs_mean = np.mean(all_costs_array, axis=1)
costs_std = np.std(all_costs_array, axis=1)
return times, list(costs_mean), list(costs_std), [], []