Source code for deepcave.evaluators.footprint
# Copyright 2021-2024 The DeepCAVE Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# noqa: D400
"""
# Footprint
This module provides utilities for creating a footprint of a run.
It uses multidimensional scaling (MDS).
It also provides utilities to get the surface and the points of the plot.
## Classes
- Footprint: Can train and create a footprint of a run.
"""
from typing import List, Optional, Tuple, Union
import numpy as np
from ConfigSpace.hyperparameters import (
CategoricalHyperparameter,
Constant,
Hyperparameter,
)
from sklearn.ensemble import RandomForestRegressor
from sklearn.manifold import MDS
from tqdm import tqdm
from deepcave.constants import BORDER_CONFIG_ID, RANDOM_CONFIG_ID
from deepcave.runs import AbstractRun, Status
from deepcave.runs.objective import Objective
from deepcave.utils.configspace import sample_border_config, sample_random_config
from deepcave.utils.logs import get_logger
logger = get_logger(__name__)
[docs]
class Footprint:
"""
Can train and create a footprint of a run.
It uses multidimensional scaling (MDS).
Provides utilities to get the surface and the points of the plot.
Properties
----------
run : AbstractRun
The AbstractRun used for the calculation of the footprint.
cs : ConfigurationSpace
The configuration space of the run.
"""
def __init__(self, run: AbstractRun):
if run.configspace is None:
raise RuntimeError("The run needs to be initialized.")
self.run = run
self.cs = run.configspace
# Important parameters
is_categorical, depth = [], []
for hp in list(self.cs.values()):
if isinstance(hp, CategoricalHyperparameter):
is_categorical.append(True)
else:
is_categorical.append(False)
depth.append(self._get_depth(hp))
self._is_categorical = np.array(is_categorical)
self._depth = np.array(depth)
# Global variables
self._distances: Optional[np.ndarray] = None
self._trained = False
self._reset()
def _reset(self) -> None:
"""Reset the footprint."""
self._objective_model: Optional[RandomForestRegressor] = None
self._area_model: Optional[RandomForestRegressor] = None
self._config_ids: Optional[List[int]] = None
self._incumbent_id: Optional[int] = None
# Those are used to fit the MDS (consists of random and border configs).
self._X: Optional[np.ndarray] = None
# Those are the fitted configs with shape (x, 2).
self._MDS_X: Optional[np.ndarray] = None
[docs]
def calculate(
self,
objective: Objective,
budget: Union[int, float],
support_discretization: Optional[int] = 10,
rejection_rate: float = 0.01,
retries: int = 3,
exclude_configs: bool = False,
) -> None:
"""
Calculate the distances and train the model.
Parameters
----------
objective : Objective
Objective and color to show.
budget : Union[int, float]
All configurations with this budget are considered.
support_discretization : Optional[int], optional
Discretization steps for integer and float hyperparameter (HP) values.
Default is set to 10.
rejection_rate : float, optional
Rejection rate whether a configuration should be rejected or not. Internally,
the max distance is calculated and if a configuration has a distance smaller than
max distance * rejection_rate, the configuration is rejected.
Default is set to 0.01.
retries : int, optional
How many times to retry adding a new configuration.
Default is set to 3.
exclude_configs : bool, optional
Whether the configurations from the run should be excluded
in the multidimensional scaling (MDS).
This is particularly interesting if only the search space should be plotted.
Default is set to False.
"""
# Reset everything
self._reset()
self.cs.seed(0)
# Get config rejection threshold
# If the distance between two configs is smaller than the threshold, the config
# is rejected
rejection_threshold = self._get_max_distance() * rejection_rate
# Get encoded configs
data = self.run.get_encoded_data(
objective, budget, statuses=Status.SUCCESS, specific=False, include_config_ids=True
)
hp_names = list(self.run.configspace.keys())
# Make numpy arrays
X = data[hp_names].to_numpy()
Y = data[objective.name].to_numpy()
config_ids = data["config_id"].values.tolist()
# Get the incumbent
incumbent_config, _ = self.run.get_incumbent(objective, budget)
self._incumbent_id = self.run.get_config_id(incumbent_config)
# Reshape Y to 2D
Y = Y.reshape(-1, 1)
# Init distances
self._init_distances(X, config_ids, exclude_configs=exclude_configs)
assert self._distances is not None
border_generator = sample_border_config(self.cs)
random_generator = sample_random_config(self.cs, d=support_discretization)
# Now the border and random configs are added
count_border = 0
count_random = 0
tries = 0
logger.info("Starting to calculate distances and add border and random configurations...")
while True:
_configs = []
_config_ids = []
try:
_configs += [next(border_generator)]
_config_ids += [BORDER_CONFIG_ID]
except StopIteration:
pass
try:
_configs += [next(random_generator)]
_config_ids += [RANDOM_CONFIG_ID]
except StopIteration:
pass
counter = 0
for config, config_id in zip(_configs, _config_ids):
if config is None:
continue
# Encode config
config_array = np.array(self.run.encode_config(config))
rejected = self._update_distances(config_array, config_id, rejection_threshold)
if not rejected:
# Count
if config_id == BORDER_CONFIG_ID:
count_border += 1
if config_id == RANDOM_CONFIG_ID:
count_random += 1
counter += 1
# Abort criteria
# If there are no new configs
if counter == 0:
tries += 1
else:
tries = 0
if tries >= retries:
break
# Or if reach more than 4000 are reached (otherwise it takes too long)
if self._distances.shape[0] % 100 == 0:
logger.info(f"Found {self._distances.shape[0]} configurations...")
if self._distances.shape[0] > 4000:
break
logger.info(f"Added {count_border} border configs and {count_random} random configs.")
logger.info(f"Total configurations: {self._distances.shape[0]}.")
logger.info("Getting MDS data...")
# Calculate MDS now to get 2D coordinates and set those points to reach them later.
MDS_X = self._get_mds()
self._MDS_X = MDS_X
# But here's the catch: Get rid of border and random configs because
# the y values for them are not known.
# However, it makes no sense to train the RF if the configs are excluded
# which were evaluated.
if not exclude_configs:
self._train_on_objective(MDS_X[: len(X)], Y.ravel())
self._trained = True
else:
self._trained = False
# Train on areas can be done anytime.
self._train_on_areas()
[docs]
def get_surface(
self, details: float = 0.5, performance: bool = True
) -> Tuple[List, List, List]:
"""
Get surface of the multidimensional scaling (MDS) plot.
Parameters
----------
details : float, optional
Steps to create the meshgrid. By default 0.5.
performance : bool, optional
Whether to get the surface from the performance or the valid areas.
Default is set to True (i.e. from performance).
Returns
-------
Tuple[List, List, List]
x (1D), y (1D) and z (2D) arrays for heatmap.
Raises
------
RuntimeError
If `calculate` was not called before.
RuntimeError
If evaluated configs weren't included.
"""
X = self._MDS_X
if X is None:
raise RuntimeError("You need to call `calculate` first.")
if performance and not self._trained:
raise RuntimeError(
"You can not get the surface" "if you do not include evaluated configs."
)
# Create meshgrid
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
num = int(20 * details) + 10
x = np.linspace(x_min, x_max, num)
y = np.linspace(y_min, y_max, num)
x_mesh, y_mesh = np.meshgrid(x, y)
conc = np.c_[x_mesh.ravel(), y_mesh.ravel()]
if performance:
model = self._objective_model
else:
model = self._area_model
if model is None:
raise RuntimeError("You need to call `calculate` first.")
z = model.predict(conc)
z = z.reshape(x_mesh.shape)
return x.tolist(), y.tolist(), z.tolist()
[docs]
def get_points(self, category: str = "configs") -> Tuple[List[float], List[float], List[int]]:
"""
Return the points of the multidimensional scaling (MDS) plot.
Parameters
----------
category : str, optional
Points of a specific category. Chose between `configs`, `borders`, `supports`
or `incumbents`. By default `configs`.
Returns
-------
Tuple[List[float], List[float], List[int]]
X, Y and config_ids as lists.
Raises
------
RuntimeError
If category is not supported.
RuntimeError
If calculated wasn't called before.
"""
if category not in ["configs", "borders", "supports", "incumbents"]:
raise RuntimeError("Unknown category.")
if self._MDS_X is None or self._config_ids is None:
raise RuntimeError("You need to call `calculate` first.")
X = []
Y = []
config_ids = []
for x, config_id in zip(self._MDS_X, self._config_ids):
if (
(category == "configs" and config_id >= 0)
or (category == "borders" and config_id == BORDER_CONFIG_ID)
or (category == "incumbents" and config_id == self._incumbent_id)
or (category == "supports" and config_id == RANDOM_CONFIG_ID)
):
x = x.tolist()
X += [x[0]]
Y += [x[1]]
config_ids += [config_id]
return X, Y, config_ids
def _get_max_distance(self) -> float:
"""
Calculate the maximum distance between all configs.
Basically, the number of Hyperparameters are just counted.
Returns
-------
float
Maximal distance between two configurations.
"""
# The number of hps is just counted
# Since X is normalized, 1 can just be added
max_distance = 0
for hp in list(self.cs.values()):
if isinstance(hp, CategoricalHyperparameter) or isinstance(hp, Constant):
continue
max_distance += 1
return max_distance
def _get_distance(self, x: np.ndarray, y: np.ndarray) -> float:
"""
Calculate distance between x and y. Both arrays must have the same length.
Parameters
----------
x : np.ndarray
Configuration 1.
y : np.ndarray
Configuration 2.
Returns
-------
float
Distance from configuration 1 and configuration 2.
Raises
------
RuntimeError
If calculate wasn't called first.
"""
if self._depth is None or self._is_categorical is None:
raise RuntimeError("You need to call `calculate` first.")
d = np.abs(x - y)
d[np.isnan(d)] = 1
d[np.logical_and(self._is_categorical, d != 0)] = 1
d = np.sum(d / self._depth)
return d
def _get_distances(self, X: np.ndarray) -> np.ndarray:
"""
Get the distances between the configurations.
Parameters
----------
X : np.ndarray
The configurations.
Returns
-------
np.ndarray
The calculated distances.
"""
n_configs = X.shape[0]
# The distances are initiated
distances = np.zeros((n_configs, n_configs))
for i in tqdm(range(n_configs)):
for j in range(i + 1, n_configs):
d = self._get_distance(X[i, :], X[j, :])
distances[i, j] = d
distances[j, i] = d
return distances
def _init_distances(
self, X: np.ndarray, config_ids: List[int], exclude_configs: bool = False
) -> None:
"""
Initialize the distances.
Parameters
----------
X : np.ndarray
Encoded data.
config_ids : List[int]
Corresponding configuration ids.
exclude_configs : bool, optional
Whether the passed X should be used or not. By default False.
"""
if not exclude_configs:
self._X = X.copy()
self._config_ids = config_ids
self._distances = self._get_distances(X)
logger.info(f"Added {len(config_ids)} configurations.")
else:
self._X = None
self._config_ids = []
self._distances = np.zeros((0, 0))
def _update_distances(
self,
config: np.ndarray,
config_id: int,
rejection_threshold: Optional[float] = 0.0,
) -> bool:
"""
Update the internal distance if the passed configuration is not rejected.
Parameters
----------
config : np.ndarray
Configuration, which is tried to be added.
config_id : int
Corresponding configuration id. This is important for later identification
as the configuration might be a border or random configuration.
rejection_threshold : Optional[float], optional
Threshold for rejecting the configuration. By default 0.0.
Returns
-------
rejected : bool
Whether the configuration was rejected or not.
"""
X = self._X
assert self._distances is not None
distances = self._distances
if X is None:
X = np.array([[]])
n_configs = 0
else:
n_configs = X.shape[0]
rejected = False
new_distances = np.zeros((n_configs + 1, n_configs + 1))
# In case X is not set
if n_configs == 0:
new_distances[0, 0] = 0
else:
# Calculate distance to all configs
new_distances[:n_configs, :n_configs] = distances[:, :]
for j in range(n_configs):
d = self._get_distance(X[j, :], config)
if rejection_threshold is not None:
if d < rejection_threshold:
rejected = True
break
# Add to new distances
new_distances[n_configs, j] = d
new_distances[j, n_configs] = d
if not rejected:
# Add to X here
if X.shape[1] == 0:
X = np.array([config])
else:
X = np.concatenate((X, np.array([config])), axis=0)
self._X = X
# There is no += to a None, an Issue has already been created
if self._config_ids is not None:
self._config_ids += [config_id]
self._distances = new_distances
return rejected
def _get_depth(self, hp: Hyperparameter) -> int:
"""
Get depth (generations above) in configuration space of a given hyperparameter (HP).
Parameters
----------
hp: Hyperparameter
name of Hyperparameter to inspect
Returns
-------
int
Depth of the Hyperparameter.
"""
parents = self.cs.parents_of[hp.name]
if not parents:
return 1
new_parents = parents
d = 1
while new_parents:
d += 1
old_parents = new_parents
new_parents = []
for p in old_parents:
pp = self.cs.get_parents_of(p.name)
if pp:
new_parents.extend(pp)
else:
return d
return d
def _get_mds(self) -> np.ndarray:
"""
Perform multidimensional scaling (MDS) on the internal distances.
Returns
-------
np.ndarray
Numpy array with multidimensional scaling (MDS) coordinates in 2D.
Raises
------
RuntimeError
When calculated wasn't called first.
"""
if self._distances is None:
raise RuntimeError("You need to call `calculate` first.")
mds = MDS(n_components=2, dissimilarity="precomputed", random_state=0)
return mds.fit_transform(self._distances)
def _train_on_objective(self, X: np.ndarray, Y: np.ndarray) -> None:
"""
Train the random forest on the performance.
Parameters
----------
X : np.ndarray
Numpy array with multidimensional scaling (MDS) coordinates in 2D.
Y : np.ndarray
Numpy array with costs.
"""
logger.info("Training on objective...")
self._objective_model = RandomForestRegressor(random_state=0)
self._objective_model.fit(X, Y)
def _train_on_areas(self) -> None:
"""
Train the random forest on the "valid" areas.
Raises
------
RuntimeError
If calculated wasn't called first.
"""
if self._MDS_X is None:
raise RuntimeError("You need to call `calculate` first.")
logger.info("Training on area...")
MDS_X = self._MDS_X
# Basically, a grid has to be created here
x_min, x_max = MDS_X[:, 0].min() - 1, MDS_X[:, 0].max() + 1
y_min, y_max = MDS_X[:, 1].min() - 1, MDS_X[:, 1].max() + 1
x = np.linspace(x_min, x_max, 20)
y = np.linspace(y_min, y_max, 20)
X = []
Y = []
for x1, x2 in zip(x, x[1:]):
for y1, y2 in zip(y, y[1:]):
# Find center
center = [(x2 - x1) / 2 + x1, (y2 - y1) / 2 + y1]
value = 0
for a, b in MDS_X:
# If it's in center add
if a >= x1 and a <= x2 and b >= y1 and b <= y2:
value = 1
break
X.append(center)
Y.append(value)
X_array = np.array(X)
Y_array = np.array(Y)
# Train the model
self._area_model = RandomForestRegressor(random_state=0)
self._area_model.fit(X_array, Y_array)