Source code for deepcave.evaluators.mo_lpi

# Copyright 2021-2024 The DeepCAVE Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

#  noqa: D400

This module provides utilities to calculate the local parameter importance (LPI).

## Classes
    - LPI: This class calculates the local parameter importance (LPI).

from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from ConfigSpace import Configuration
from ConfigSpace.c_util import change_hp_value
from ConfigSpace.util import impute_inactive_values

from deepcave.evaluators.epm.fanova_forest import FanovaForest
from deepcave.evaluators.lpi import LPI
from deepcave.runs import AbstractRun
from deepcave.runs.objective import Objective
from deepcave.utils.multi_objective_importance import get_weightings

[docs] class MOLPI(LPI): """ Calculate the multi-objective local parameter importance (LPI). Override: to train the random forest with an arbitrary weighting of the objectives (multi-objective case). Properties ---------- run : AbstractRun The AbstractRun to get the importance from. cs : ConfigurationSpace The configuration space of the run. hp_names : List[str] The names of the Hyperparameters. variances : Dict[Any, list] The overall variances per tree. importances : dict The importances of the Hyperparameters. continuous_neighbors : int The number of neighbors chosen for continuous Hyperparameters. incumbent : Configuration The incumbent of the run. default : Configuration A configuration containing Hyperparameters with default values. incumbent_array : numpy.ndarray The internal vector representation of the incumbent. seed : int The seed. If not provided it will be random. rs : RandomState A random state with a given seed value. """ def __init__(self, run: AbstractRun): super().__init__(run) self.importances: Optional[pd.DataFrame] = None
[docs] def calculate( self, objectives: Optional[Union[Objective, List[Objective]]] = None, budget: Optional[Union[int, float]] = None, continous_neighbors: int = 500, n_trees: int = 10, seed: int = 0, ) -> None: """ Prepare the data and train a RandomForest model. Parameters ---------- objectives : Optional[Union[Objective, List[Objective]]], optional Considered objectives. By default, None. If None, all objectives are considered. budget : Optional[Union[int, float]], optional Considered budget. By default, None. If None, the highest budget is chosen. continuous_neighbors : int, optional How many neighbors should be chosen for continuous hyperparameters (HPs). By default, 500. n_trees : int, optional The number of trees for the fanova forest. Default is 10. seed : Optional[int], optional The seed. By default None. If None, a random seed is chosen. """ if objectives is None: objectives = if budget is None: budget = # Set variables self.continous_neighbors = continous_neighbors self.default = self.cs.get_default_configuration() self.seed = seed = np.random.RandomState(seed) # Get data df = objectives=objectives, budget=budget, specific=True, include_combined_cost=True, include_config_ids=True, ) # normalize objectives assert isinstance(objectives, list) objectives_normed = list() for obj in objectives: normed = + "_normed" df[normed] = (df[] - df[].min()) / ( df[].max() - df[].min() ) if obj.optimize == "upper": df[normed] = 1 - df[normed] objectives_normed.append(normed) df = df.dropna(subset=objectives_normed) X = df[self.hp_names].to_numpy() df_all = pd.DataFrame([]) weightings = get_weightings(objectives_normed, df) # calculate importance for each weighting generated from the pareto efficient points for w in weightings: Y = sum(df[obj] * weighting for obj, weighting in zip(objectives_normed, w)).to_numpy() # Use same forest as for fanova self._model = FanovaForest(self.cs, n_trees=n_trees, seed=seed) self._model.train(X, Y) incumbent_cfg_id = np.argmin(sum(df[obj] * w for obj, w in zip(objectives_normed, w))) self.incumbent =[incumbent_cfg_id]["config_id"]) self.incumbent_array = self.incumbent.get_array() importances = self.calc_one_weighting() df_res = pd.DataFrame(importances).loc[0:1].T.reset_index() df_res["weight"] = w[0] df_all = pd.concat([df_all, df_res]) self.importances = df_all.rename( columns={0: "importance", 1: "variance", "index": "hp_name"} ).reset_index(drop=True) self.importances = lambda x: max(x, 0) if not isinstance(x, str) else x ) # no negative values
[docs] def calc_one_weighting(self) -> Dict[str, Tuple[float, float]]: """ Prepare the data after a model has be trained for one weighting. Returns ------- imp_var_dict: Dict[str, Tuple[float, float]] Dictionary of importances and variances. """ # Get neighborhood sampled on an unit-hypercube. neighborhood = self._get_neighborhood() # The delta performance is needed from the default configuration and the incumbent def_perf, def_var = self._predict_mean_var(self.default) inc_perf, inc_var = self._predict_mean_var(self.incumbent) delta = def_perf - inc_perf # These are used for plotting and hold the predictions for each neighbor of each parameter. # That means performances holds the mean, variances the variance of the forest. performances: Dict[str, List[np.ndarray]] = {} variances: Dict[str, List[np.ndarray]] = {} # These are used for importance and hold the corresponding importance/variance over # neighbors. Only import if NOT quantifying importance via performance-variance across # neighbors. # Nested list of values per tree in random forest. predictions: Dict[str, List[List[np.ndarray]]] = {} # Iterate over parameters for hp_idx, hp_name in enumerate(self.incumbent.keys()): if hp_name not in neighborhood: continue performances[hp_name] = [] variances[hp_name] = [] predictions[hp_name] = [] incumbent_added = False incumbent_idx = 0 # Iterate over neighbors for unit_neighbor, neighbor in zip(neighborhood[hp_name][0], neighborhood[hp_name][1]): if not incumbent_added: # Detect incumbent if unit_neighbor > self.incumbent_array[hp_idx]: performances[hp_name].append(inc_perf) variances[hp_name].append(inc_var) incumbent_added = True else: incumbent_idx += 1 # Create the neighbor-Configuration object new_array = self.incumbent_array.copy() new_array = change_hp_value( self.cs, new_array, hp_name, unit_neighbor, self.cs.index_of[hp_name] ) new_config = impute_inactive_values(Configuration(self.cs, vector=new_array)) # Get the leaf values x = np.array(new_config.get_array()) leaf_values = self._model.get_leaf_values(x) # And the prediction/performance/variance predictions[hp_name].append([np.mean(tree_pred) for tree_pred in leaf_values]) performances[hp_name].append(np.mean(predictions[hp_name][-1])) variances[hp_name].append(np.var(predictions[hp_name][-1])) if len(neighborhood[hp_name][0]) > 0: neighborhood[hp_name][0] = np.insert( neighborhood[hp_name][0], incumbent_idx, self.incumbent_array[hp_idx] ) neighborhood[hp_name][1] = np.insert( neighborhood[hp_name][1], incumbent_idx, self.incumbent[hp_name] ) else: neighborhood[hp_name][0] = np.array(self.incumbent_array[hp_idx]) neighborhood[hp_name][1] = [self.incumbent[hp_name]] if not incumbent_added: performances[hp_name].append(inc_perf) variances[hp_name].append(inc_var) # Avoid division by zero if delta == 0: delta = 1 # Creating actual importance value (by normalizing over sum of vars) num_trees = len(list(predictions.values())[0][0]) hp_names = list(performances.keys()) overall_var_per_tree = {} for hp_name in hp_names: hp_variances = [] for tree_idx in range(num_trees): variance = np.var([neighbor[tree_idx] for neighbor in predictions[hp_name]]) hp_variances += [variance] overall_var_per_tree[hp_name] = hp_variances # Sum up variances per tree across parameters sum_var_per_tree = [ sum([overall_var_per_tree[hp_name][tree_idx] for hp_name in hp_names]) for tree_idx in range(num_trees) ] # Normalize overall_var_per_tree = { p: [ t / sum_var_per_tree[idx] if sum_var_per_tree[idx] != 0.0 else np.nan for idx, t in enumerate(trees) ] for p, trees in overall_var_per_tree.items() } imp_var_dict = { k: (np.mean(overall_var_per_tree[k]), np.var(overall_var_per_tree[k])) for k in overall_var_per_tree } return imp_var_dict
[docs] def get_importances_(self, hp_names: List[str]) -> str: """ Return the importance scores from the passed Hyperparameter names. Parameters ---------- hp_names : Optional[List[str]] Selected Hyperparameter names to get the importance scores from. If None, all Hyperparameters of the configuration space are used. Returns ------- Dict Dictionary with Hyperparameter names and the corresponding importance scores and variances. Raises ------ RuntimeError If the important scores are not calculated. """ if self.importances is None: raise RuntimeError("Importance scores must be calculated first.") if hp_names: return self.importances.loc[self.importances["hp_name"].isin(hp_names)].to_json() else: return self.importances.to_json()