Source code for deepcave.evaluators.mo_lpi

# Copyright 2021-2024 The DeepCAVE Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#  noqa: D400
"""
# LPI

This module provides utilities to calculate the local parameter importance (LPI).

## Classes
    - LPI: This class calculates the local parameter importance (LPI).
"""

from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from ConfigSpace import Configuration
from ConfigSpace.c_util import change_hp_value
from ConfigSpace.util import impute_inactive_values

from deepcave.evaluators.epm.fanova_forest import FanovaForest
from deepcave.evaluators.lpi import LPI
from deepcave.runs import AbstractRun
from deepcave.runs.objective import Objective
from deepcave.utils.multi_objective_importance import get_weightings


# https://github.com/automl/ParameterImportance/blob/f4950593ee627093fc30c0847acc5d8bf63ef84b/pimp/evaluator/local_parameter_importance.py#L27
[docs] class MOLPI(LPI): """ Calculate the multi-objective local parameter importance (LPI). Override: to train the random forest with an arbitrary weighting of the objectives (multi-objective case). Properties ---------- run : AbstractRun The AbstractRun to get the importance from. cs : ConfigurationSpace The configuration space of the run. hp_names : List[str] The names of the Hyperparameters. variances : Dict[Any, list] The overall variances per tree. importances : dict The importances of the Hyperparameters. continuous_neighbors : int The number of neighbors chosen for continuous Hyperparameters. incumbent : Configuration The incumbent of the run. default : Configuration A configuration containing Hyperparameters with default values. incumbent_array : numpy.ndarray The internal vector representation of the incumbent. seed : int The seed. If not provided it will be random. rs : RandomState A random state with a given seed value. """ def __init__(self, run: AbstractRun): super().__init__(run) self.importances: Optional[pd.DataFrame] = None
[docs] def calculate( self, objectives: Optional[Union[Objective, List[Objective]]] = None, budget: Optional[Union[int, float]] = None, continous_neighbors: int = 500, n_trees: int = 10, seed: int = 0, ) -> None: """ Prepare the data and train a RandomForest model. Parameters ---------- objectives : Optional[Union[Objective, List[Objective]]], optional Considered objectives. By default, None. If None, all objectives are considered. budget : Optional[Union[int, float]], optional Considered budget. By default, None. If None, the highest budget is chosen. continuous_neighbors : int, optional How many neighbors should be chosen for continuous hyperparameters (HPs). By default, 500. n_trees : int, optional The number of trees for the fanova forest. Default is 10. seed : Optional[int], optional The seed. By default None. If None, a random seed is chosen. """ if objectives is None: objectives = self.run.get_objectives() if budget is None: budget = self.run.get_highest_budget() # Set variables self.continous_neighbors = continous_neighbors self.default = self.cs.get_default_configuration() self.seed = seed self.rs = np.random.RandomState(seed) # Get data df = self.run.get_encoded_data( objectives=objectives, budget=budget, specific=True, include_combined_cost=True, include_config_ids=True, ) # normalize objectives assert isinstance(objectives, list) objectives_normed = list() for obj in objectives: normed = obj.name + "_normed" df[normed] = (df[obj.name] - df[obj.name].min()) / ( df[obj.name].max() - df[obj.name].min() ) if obj.optimize == "upper": df[normed] = 1 - df[normed] objectives_normed.append(normed) df = df.dropna(subset=objectives_normed) X = df[self.hp_names].to_numpy() df_all = pd.DataFrame([]) weightings = get_weightings(objectives_normed, df) # calculate importance for each weighting generated from the pareto efficient points for w in weightings: Y = sum(df[obj] * weighting for obj, weighting in zip(objectives_normed, w)).to_numpy() # Use same forest as for fanova self._model = FanovaForest(self.cs, n_trees=n_trees, seed=seed) self._model.train(X, Y) incumbent_cfg_id = np.argmin(sum(df[obj] * w for obj, w in zip(objectives_normed, w))) self.incumbent = self.run.get_config(df.iloc[incumbent_cfg_id]["config_id"]) self.incumbent_array = self.incumbent.get_array() importances = self.calc_one_weighting() df_res = pd.DataFrame(importances).loc[0:1].T.reset_index() df_res["weight"] = w[0] df_all = pd.concat([df_all, df_res]) self.importances = df_all.rename( columns={0: "importance", 1: "variance", "index": "hp_name"} ).reset_index(drop=True) self.importances = self.importances.map( lambda x: max(x, 0) if not isinstance(x, str) else x ) # no negative values
[docs] def calc_one_weighting(self) -> Dict[str, Tuple[float, float]]: """ Prepare the data after a model has be trained for one weighting. Returns ------- imp_var_dict: Dict[str, Tuple[float, float]] Dictionary of importances and variances. """ # Get neighborhood sampled on an unit-hypercube. neighborhood = self._get_neighborhood() # The delta performance is needed from the default configuration and the incumbent def_perf, def_var = self._predict_mean_var(self.default) inc_perf, inc_var = self._predict_mean_var(self.incumbent) delta = def_perf - inc_perf # These are used for plotting and hold the predictions for each neighbor of each parameter. # That means performances holds the mean, variances the variance of the forest. performances: Dict[str, List[np.ndarray]] = {} variances: Dict[str, List[np.ndarray]] = {} # These are used for importance and hold the corresponding importance/variance over # neighbors. Only import if NOT quantifying importance via performance-variance across # neighbors. # Nested list of values per tree in random forest. predictions: Dict[str, List[List[np.ndarray]]] = {} # Iterate over parameters for hp_idx, hp_name in enumerate(self.incumbent.keys()): if hp_name not in neighborhood: continue performances[hp_name] = [] variances[hp_name] = [] predictions[hp_name] = [] incumbent_added = False incumbent_idx = 0 # Iterate over neighbors for unit_neighbor, neighbor in zip(neighborhood[hp_name][0], neighborhood[hp_name][1]): if not incumbent_added: # Detect incumbent if unit_neighbor > self.incumbent_array[hp_idx]: performances[hp_name].append(inc_perf) variances[hp_name].append(inc_var) incumbent_added = True else: incumbent_idx += 1 # Create the neighbor-Configuration object new_array = self.incumbent_array.copy() new_array = change_hp_value( self.cs, new_array, hp_name, unit_neighbor, self.cs.index_of[hp_name] ) new_config = impute_inactive_values(Configuration(self.cs, vector=new_array)) # Get the leaf values x = np.array(new_config.get_array()) leaf_values = self._model.get_leaf_values(x) # And the prediction/performance/variance predictions[hp_name].append([np.mean(tree_pred) for tree_pred in leaf_values]) performances[hp_name].append(np.mean(predictions[hp_name][-1])) variances[hp_name].append(np.var(predictions[hp_name][-1])) if len(neighborhood[hp_name][0]) > 0: neighborhood[hp_name][0] = np.insert( neighborhood[hp_name][0], incumbent_idx, self.incumbent_array[hp_idx] ) neighborhood[hp_name][1] = np.insert( neighborhood[hp_name][1], incumbent_idx, self.incumbent[hp_name] ) else: neighborhood[hp_name][0] = np.array(self.incumbent_array[hp_idx]) neighborhood[hp_name][1] = [self.incumbent[hp_name]] if not incumbent_added: performances[hp_name].append(inc_perf) variances[hp_name].append(inc_var) # Avoid division by zero if delta == 0: delta = 1 # Creating actual importance value (by normalizing over sum of vars) num_trees = len(list(predictions.values())[0][0]) hp_names = list(performances.keys()) overall_var_per_tree = {} for hp_name in hp_names: hp_variances = [] for tree_idx in range(num_trees): variance = np.var([neighbor[tree_idx] for neighbor in predictions[hp_name]]) hp_variances += [variance] overall_var_per_tree[hp_name] = hp_variances # Sum up variances per tree across parameters sum_var_per_tree = [ sum([overall_var_per_tree[hp_name][tree_idx] for hp_name in hp_names]) for tree_idx in range(num_trees) ] # Normalize overall_var_per_tree = { p: [ t / sum_var_per_tree[idx] if sum_var_per_tree[idx] != 0.0 else np.nan for idx, t in enumerate(trees) ] for p, trees in overall_var_per_tree.items() } imp_var_dict = { k: (np.mean(overall_var_per_tree[k]), np.var(overall_var_per_tree[k])) for k in overall_var_per_tree } return imp_var_dict
[docs] def get_importances_(self, hp_names: List[str]) -> str: """ Return the importance scores from the passed Hyperparameter names. Parameters ---------- hp_names : Optional[List[str]] Selected Hyperparameter names to get the importance scores from. If None, all Hyperparameters of the configuration space are used. Returns ------- Dict Dictionary with Hyperparameter names and the corresponding importance scores and variances. Raises ------ RuntimeError If the important scores are not calculated. """ if self.importances is None: raise RuntimeError("Importance scores must be calculated first.") if hp_names: return self.importances.loc[self.importances["hp_name"].isin(hp_names)].to_json() else: return self.importances.to_json()