import itertools
import os
import tempfile
from collections import OrderedDict

import numpy as np
from ConfigSpace.configuration_space import Configuration, ConfigurationSpace
from ConfigSpace.hyperparameters import CategoricalHyperparameter
from ConfigSpace.read_and_write import json as pcs_json
from ConfigSpace.read_and_write import pcs_new
from smac.runhistory.runhistory import RunHistory
from smac.scenario.scenario import Scenario
from smac.stats.stats import Stats
from smac.tae import StatusType
from import TrajLogger

from cave.reader.conversion.base_converter import BaseConverter
from cave.utils.helpers import get_folder_basenames

[docs]class HpBandSter2SMAC(BaseConverter): """ Converts data from `hpbandster <>`_ at least up to the 1.0 release into SMAC3 data. """
[docs] def convert(self, folders, ta_exec_dirs=None, output_dir=None, converted_dest='converted_input_data'): try: from hpbandster.core.result import Result as HPBResult from hpbandster.core.result import logged_results_to_HBS_result except ImportError as e: raise ImportError("To analyze BOHB-data, please install hpbandster (e.g. `pip install hpbandster`)") self.logger.debug("Converting BOHB-data to SMAC3-data. Called with: folders=%s, ta_exec_dirs=%s, output_dir=%s," " converted_dest=%s", str(folders), str(ta_exec_dirs), str(output_dir), str(converted_dest)) # Using temporary files for the intermediate smac-result-like format if no output_dir specified if not output_dir: output_dir = tempfile.mkdtemp() self.logger.debug("Temporary directory for intermediate SMAC3-results: %s", output_dir) if ta_exec_dirs is None or len(ta_exec_dirs) == 0: ta_exec_dirs = ['.'] if len(ta_exec_dirs) != len(folders): ta_exec_dirs = [ta_exec_dirs[0] for _ in folders] # Get a list with alternative interpretations of the configspace-file # (if it's a .pcs-file, for .json-files it's a length-one-list) cs_interpretations = self.load_configspace(folders[0]) self.logger.debug("Loading with %d configspace alternative options...", len(cs_interpretations))"Assuming BOHB treats target algorithms as deterministic (and does not re-evaluate)") ##################### # Actual conversion # ##################### folder_basenames = get_folder_basenames(folders) result = OrderedDict() for f, f_base, ta_exec_dir in zip(folders, folder_basenames, ta_exec_dirs): # Those are the parallel runs converted_folder_path = os.path.join(output_dir, converted_dest, f_base) self.logger.debug("Processing folder=%s, f_base=%s, ta_exec_dir=%s. Saving to %s.", f, f_base, ta_exec_dir, converted_folder_path) if not os.path.exists(converted_folder_path): self.logger.debug("%s doesn't exist. Creating...", converted_folder_path) os.makedirs(converted_folder_path) # Original hpbandster-formatted result-object hp_result = logged_results_to_HBS_result(f) result[f] = self.hpbandster2smac(f, hp_result, cs_interpretations, converted_folder_path) return result
[docs] def load_configspace(self, folder): """ Will try to load the configspace. cs_options will be a list containing all possible combinations of interpretation for Categoricals. If this issue will be fixed, we can drop this procedure. Parameters ---------- folder: str path to folder in which to look for configspace Returns ------- cs_options: list[ConfigurationSpace] list with possible interpretations for config-space-file. Only contains multiple items if file-format is pcs. """ cs_options = [] cs_fn_json = os.path.join(folder, 'configspace.json') cs_fn_pcs = os.path.join(folder, 'configspace.pcs') if os.path.exists(cs_fn_json): with open(cs_fn_json, 'r') as fh: cs_options = [] self.logger.debug("Detected and loaded \"%s\". No alternative interpretations necessary", cs_fn_json) elif os.path.exists(cs_fn_pcs): with open(cs_fn_pcs, 'r') as fh: cs = # Create alternative interpretations categoricals = [hp for hp in cs.get_hyperparameters() if isinstance(hp, CategoricalHyperparameter)] non_categoricals = [hp for hp in cs.get_hyperparameters() if not isinstance(hp, CategoricalHyperparameter)] def _get_interpretations(choices): """ Generate different interpretations for critical categorical hyperparameters that are not seamlessly supported by pcs-format.""" result = [] if set(choices) == {"True", "False"}: result.append([True, False]) if all([c.isdigit() for c in choices]): result.append([int(c) for c in choices]) result.append(choices) return result choices_per_cat = [_get_interpretations(hp.choices) for hp in categoricals] combinations = itertools.product(*choices_per_cat) self.logger.debug(combinations) for combi in combinations: bcs = ConfigurationSpace() for hp in non_categoricals: bcs.add_hyperparameter(hp) for name, choices in zip([ for hp in categoricals], combi): bcs.add_hyperparameter(CategoricalHyperparameter(name, choices)) bcs.add_conditions(cs.get_conditions()) cs_options.append(bcs) self.logger.debug("Sampled %d interpretations of \"%s\"", len(cs_options), cs_fn_pcs) else: raise ValueError("Missing pcs-file at '%s.[pcs|json]'!" % os.path.join(folder, 'configspace')) return cs_options
[docs] def _get_config(self, config_id, id2config, cs): config = Configuration(cs, id2config[config_id]['config']) try: model_based_pick = id2config[config_id]['config_info']['model_based_pick'] config.origin = 'Model based pick' if model_based_pick else 'Random' except KeyError: self.logger.debug("No origin for config (id %s)!" % str(config_id), exc_info=True) return config
[docs] def hpbandster2smac(self, folder, result, cs_options, output_dir: str): """Reading hpbandster-result-object and creating RunHistory and trajectory... Parameters ---------- folder: str (path) original folder result: hpbandster.core.result.Result bohb's result-object cs_options: list[ConfigurationSpace] the configuration spaces. in the best case it's a single element, but for pcs-format we need to guess through a list of possible configspaces output_dir_base: str the output-dir to save the smac-runs to Returns ------- converted: dict{ 'new_path' : path_to_converted_input, 'hp_bandster_result' : result_in_hpbandster_format, 'config_space' : config_space, 'runhistory' : runhistory, 'validated_runhistory' : validated_runhistory, 'scenario' : scenario, 'trajectory' : trajectory, } """ self.logger.debug("Budgets for '%s': %s" % (folder, str(result.HB_config['budgets']))) ########################## # 1. Create runhistory # ########################## id2config_mapping = result.get_id2config_mapping() skipped = {'None' : 0, 'NaN' : 0} rh = RunHistory() for run in result.get_all_runs(): # Load config... config = None while config is None: if len(cs_options) == 0: self.logger.debug("None of the alternatives worked...") raise ValueError("Your configspace seems to be corrupt. If you use floats (or mix up ints, bools " "and strings) as categoricals, please consider using the .json-format, as the " ".pcs-format cannot recover the type of categoricals. Otherwise please report " "this to (and attach the debug.log)") try: config = self._get_config(run.config_id, id2config_mapping, cs_options[0]) except ValueError as err: self.logger.debug("Loading config failed. Trying %d alternatives" % len(cs_options) - 1, exc_info=1) cs_options = cs_options[1:] # remove the failing cs-version # Filter corrupted loss-values (ignore them) if run.loss is None: skipped['None'] += 1 continue if np.isnan(run.loss): skipped['NaN'] += 1 continue rh.add(config=config, cost=run.loss, time=run.time_stamps['finished'] - run.time_stamps['started'], status=StatusType.SUCCESS, budget=run.budget, seed=0, additional_info={'info' :, 'timestamps': run.time_stamps}) self.logger.debug("Skipped %d None- and %d NaN-loss-values in BOHB-result", skipped['None'], skipped['NaN']) ########################## # 2. Create all else # ########################## scenario = Scenario({'run_obj': 'quality', 'cs': cs_options[0], 'output_dir': output_dir, 'deterministic': True, # At the time of writing, BOHB is always treating ta's as deterministic }) scenario.output_dir_for_this_run = output_dir scenario.write() with open(os.path.join(output_dir, 'configspace.json'), 'w') as fh: fh.write(pcs_json.write(cs_options[0])) rh.save_json(fn=os.path.join(output_dir, 'runhistory.json')) trajectory = self.get_trajectory(result, output_dir, scenario, rh) return {'new_path': output_dir, 'hpbandster_result': result, 'config_space': cs_options[0], 'runhistory': rh, 'validated_runhistory': None, 'scenario': scenario, 'trajectory': trajectory, }
[docs] def get_trajectory(self, result, output_path, scenario, rh): """ Use hpbandster's averaging. """ cs = scenario.cs if not output_path: output_path = tempfile.mkdtemp() traj_logger = TrajLogger(output_path, Stats(scenario)) total_traj_dict = [] traj_dict = result.get_incumbent_trajectory() id2config_mapping = result.get_id2config_mapping() failed_entries = [] for config_id, time, budget, loss in zip(traj_dict['config_ids'], traj_dict['times_finished'], traj_dict['budgets'], traj_dict['losses']): incumbent = self._get_config(config_id, id2config_mapping, cs) try: incumbent_id = rh.config_ids[incumbent] except KeyError as err: self.logger.debug(err) failed_entries.append((config_id, incumbent)) self.logger.debug("Could not load configuration id %d (%s)", config_id, str(incumbent)) continue total_traj_dict.append({'config_id' : incumbent_id, 'time_finished' : time, 'budget' : budget, 'loss' : loss}) if len(failed_entries) > 0: self.logger.warning("Failed to load %d (of %d total) entries from trajectory", len(failed_entries), len(failed_entries) + len(total_traj_dict)) last_loss = np.inf for element in sorted(total_traj_dict, key=lambda x: x['time_finished']): incumbent_id = element["config_id"] incumbent = rh.ids_config[incumbent_id] time = element["time_finished"] loss = element["loss"] budget = element["budget"] if loss > last_loss: continue ta_runs = -1 ta_time_used = -1 wallclock_time = time train_perf = loss # add to trajectory, imitate `add_entry` method of SMAC's traj_logger traj_logger.trajectory.append({ 'cpu_time': ta_time_used, 'total_cpu_time': None, "wallclock_time": wallclock_time, "evaluations": ta_runs, "cost": train_perf, "incumbent": incumbent, "budget": budget }) traj_logger._add_in_alljson_format(train_perf, incumbent_id, incumbent, budget, ta_time_used, wallclock_time, ) return traj_logger.trajectory