import copy
import os
import json
from hpbandster.core.base_iteration import Datum
[docs]class Run(object):
"""
Not a proper class, more a 'struct' to bundle important
information about a particular run
"""
def __init__(self, config_id, budget, loss, info, time_stamps, error_logs):
self.config_id = config_id
self.budget = budget
self.error_logs = error_logs
self.loss = loss
self.info = info
self.time_stamps = time_stamps
def __repr__(self):
return(\
"config_id: %s\t"%(self.config_id,) + \
"budget: %f\t"%self.budget + \
"loss: %s\n"%self.loss + \
"time_stamps: {submitted} (submitted), {started} (started), {finished} (finished)\n".format(**self.time_stamps) + \
"info: %s\n"%self.info
)
def __getitem__ (self, k):
"""
in case somebody wants to use it like a dictionary
"""
return(getattr(self, k))
[docs]class json_result_logger(object):
def __init__(self, directory, overwrite=False):
"""
convenience logger for 'semi-live-results'
Logger that writes job results into two files (configs.json and results.json).
Both files contain propper json objects in each line.
This version opens and closes the files for each result.
This might be very slow if individual runs are fast and the
filesystem is rather slow (e.g. a NFS).
Parameters
----------
directory: string
the directory where the two files 'configs.json' and
'results.json' are stored
overwrite: bool
In case the files already exist, this flag controls the
behavior:
* True: The existing files will be overwritten. Potential risk of deleting previous results
* False: A FileExistsError is raised and the files are not modified.
"""
os.makedirs(directory, exist_ok=True)
self.config_fn = os.path.join(directory, 'configs.json')
self.results_fn = os.path.join(directory, 'results.json')
try:
with open(self.config_fn, 'x') as fh: pass
except FileExistsError:
if overwrite:
with open(self.config_fn, 'w') as fh: pass
else:
raise FileExistsError('The file %s already exists.'%self.config_fn)
except:
raise
try:
with open(self.results_fn, 'x') as fh: pass
except FileExistsError:
if overwrite:
with open(self.results_fn, 'w') as fh: pass
else:
raise FileExistsError('The file %s already exists.'%self.config_fn)
except:
raise
self.config_ids = set()
[docs] def new_config(self, config_id, config, config_info):
if not config_id in self.config_ids:
self.config_ids.add(config_id)
with open(self.config_fn, 'a') as fh:
fh.write(json.dumps([config_id, config, config_info]))
fh.write('\n')
def __call__(self, job):
if not job.id in self.config_ids:
#should never happen! TODO: log warning here!
self.config_ids.add(job.id)
with open(self.config_fn, 'a') as fh:
fh.write(json.dumps([job.id, job.kwargs['config'], {}]))
fh.write('\n')
with open(self.results_fn, 'a') as fh:
fh.write(json.dumps([job.id, job.kwargs['budget'], job.timestamps, job.result, job.exception]))
fh.write("\n")
[docs]def logged_results_to_HBS_result(directory):
"""
function to import logged 'live-results' and return a HB_result object
You can load live run results with this function and the returned
HB_result object gives you access to the results the same way
a finished run would.
Parameters
----------
directory: str
the directory containing the results.json and config.json files
Returns
-------
hpbandster.core.result.Result: :object:
TODO
"""
data = {}
time_ref = float('inf')
budget_set = set()
with open(os.path.join(directory, 'configs.json')) as fh:
for line in fh:
line = json.loads(line)
if len(line) == 3:
config_id, config, config_info = line
if len(line) == 2:
config_id, config, = line
config_info = 'N/A'
data[tuple(config_id)] = Datum(config=config, config_info=config_info)
with open(os.path.join(directory, 'results.json')) as fh:
for line in fh:
config_id, budget,time_stamps, result, exception = json.loads(line)
id = tuple(config_id)
data[id].time_stamps[budget] = time_stamps
data[id].results[budget] = result
data[id].exceptions[budget] = exception
budget_set.add(budget)
time_ref = min(time_ref, time_stamps['submitted'])
# infer the hyperband configuration from the data
budget_list = sorted(list(budget_set))
HB_config = {
'eta' : None if len(budget_list) < 2 else budget_list[1]/budget_list[0],
'min_budget' : min(budget_set),
'max_budget' : max(budget_set),
'budgets' : budget_list,
'max_SH_iter': len(budget_set),
'time_ref' : time_ref
}
return(Result([data], HB_config))
[docs]class Result(object):
"""
Object returned by the HB_master.run function
This class offers a simple API to access the information from
a Hyperband run.
"""
def __init__ (self, HB_iteration_data, HB_config):
self.data = HB_iteration_data
self.HB_config = HB_config
self._merge_results()
def __getitem__(self, k):
return(self.data[k])
[docs] def get_incumbent_id(self):
"""
Find the config_id of the incumbent.
The incumbent here is the configuration with the smallest loss
among all runs on the maximum budget! If no run finishes on the
maximum budget, None is returned!
"""
tmp_list = []
for k,v in self.data.items():
try:
# only things run for the max budget are considered
res = v.results[self.HB_config['max_budget']]
if not res is None:
tmp_list.append((res['loss'], k))
except KeyError as e:
pass
except:
raise
if len(tmp_list) > 0:
return(min(tmp_list)[1])
return(None)
[docs] def get_incumbent_trajectory(self, all_budgets=True, bigger_is_better=True, non_decreasing_budget=True):
"""
Returns the best configurations over time
Parameters
----------
all_budgets: bool
If set to true all runs (even those not with the largest budget) can be the incumbent.
Otherwise, only full budget runs are considered
bigger_is_better:bool
flag whether an evaluation on a larger budget is always considered better.
If True, the incumbent might increase for the first evaluations on a bigger budget
non_decreasing_budget: bool
flag whether the budget of a new incumbent should be at least as big as the one for
the current incumbent.
Returns
-------
dict:
dictionary with all the config IDs, the times the runs
finished, their respective budgets, and corresponding losses
"""
all_runs = self.get_all_runs(only_largest_budget = not all_budgets)
if not all_budgets:
all_runs = list(filter(lambda r: r.budget==res.HB_config['max_budget'], all_runs))
all_runs.sort(key=lambda r: r.time_stamps['finished'])
return_dict = { 'config_ids' : [],
'times_finished': [],
'budgets' : [],
'losses' : [],
}
current_incumbent = float('inf')
incumbent_budget = self.HB_config['min_budget']
for r in all_runs:
if r.loss is None: continue
new_incumbent = False
if bigger_is_better and r.budget > incumbent_budget:
new_incumbent = True
if r.loss < current_incumbent:
new_incumbent = True
if non_decreasing_budget and r.budget < incumbent_budget:
new_incumbent = False
if new_incumbent:
current_incumbent = r.loss
incumbent_budget = r.budget
return_dict['config_ids'].append(r.config_id)
return_dict['times_finished'].append(r.time_stamps['finished'])
return_dict['budgets'].append(r.budget)
return_dict['losses'].append(r.loss)
if current_incumbent != r.loss:
r = all_runs[-1]
return_dict['config_ids'].append(return_dict['config_ids'][-1])
return_dict['times_finished'].append(r.time_stamps['finished'])
return_dict['budgets'].append(return_dict['budgets'][-1])
return_dict['losses'].append(return_dict['losses'][-1])
return (return_dict)
[docs] def get_runs_by_id(self, config_id):
"""
returns a list of runs for a given config id
The runs are sorted by ascending budget, so '-1' will give
the longest run for this config.
"""
d = self.data[config_id]
runs = []
for b in d.results.keys():
try:
err_logs = d.exceptions.get(b, None)
if d.results[b] is None:
r = Run(config_id, b, None, None , d.time_stamps[b], err_logs)
else:
r = Run(config_id, b, d.results[b]['loss'], d.results[b]['info'] , d.time_stamps[b], err_logs)
runs.append(r)
except:
raise
runs.sort(key=lambda r: r.budget)
return(runs)
[docs] def get_learning_curves(self, lc_extractor=extract_HBS_learning_curves, config_ids=None):
"""
extracts all learning curves from all run configurations
Parameters
----------
lc_extractor: callable
a function to return a list of learning_curves.
defaults to hpbanster.HB_result.extract_HP_learning_curves
config_ids: list of valid config ids
if only a subset of the config ids is wanted
Returns
-------
dict
a dictionary with the config_ids as keys and the
learning curves as values
"""
config_ids = self.data.keys() if config_ids is None else config_ids
lc_dict = {}
for id in config_ids:
runs = self.get_runs_by_id(id)
lc_dict[id] = lc_extractor(runs)
return(lc_dict)
[docs] def get_all_runs(self, only_largest_budget=False):
"""
returns all runs performed
Parameters
----------
only_largest_budget: boolean
if True, only the largest budget for each configuration
is returned. This makes sense if the runs are continued
across budgets and the info field contains the information
you care about. If False, all runs of a configuration
are returned
"""
all_runs = []
for k in self.data.keys():
runs = self.get_runs_by_id(k)
if len(runs) > 0:
if only_largest_budget:
all_runs.append(runs[-1])
else:
all_runs.extend(runs)
return(all_runs)
[docs] def get_id2config_mapping(self):
"""
returns a dict where the keys are the config_ids and the values
are the actual configurations
"""
new_dict = {}
for k, v in self.data.items():
new_dict[k] = {}
new_dict[k]['config'] = copy.deepcopy(v.config)
try:
new_dict[k]['config_info'] = copy.deepcopy(v.config_info)
except:
pass
return(new_dict)
def _merge_results(self):
"""
hidden function to merge the list of results into one
dictionary and 'normalize' the time stamps
"""
new_dict = {}
for it in self.data:
new_dict.update(it)
for k,v in new_dict.items():
for kk, vv in v.time_stamps.items():
for kkk,vvv in vv.items():
new_dict[k].time_stamps[kk][kkk] = vvv - self.HB_config['time_ref']
self.data = new_dict
[docs] def num_iterations(self):
return(max([k[0] for k in self.data.keys()]) + 1)
[docs] def get_fANOVA_data(self, config_space, budgets=None, loss_fn=lambda r: r.loss, failed_loss=None):
import numpy as np
import ConfigSpace as CS
id2conf = self.get_id2config_mapping()
if budgets is None:
budgets = self.HB_config['budgets']
if len(budgets)>1:
config_space.add_hyperparameter(CS.UniformFloatHyperparameter('budget', min(budgets), max(budgets), log=True))
hp_names = config_space.get_hyperparameter_names()
hps = config_space.get_hyperparameters()
needs_transform = list(map(lambda h: isinstance(h, CS.CategoricalHyperparameter), hps))
all_runs = self.get_all_runs(only_largest_budget=False)
all_runs=list(filter( lambda r: r.budget in budgets, all_runs))
X = []
y = []
for r in all_runs:
if r.loss is None:
if failed_loss is None: continue
else: y.append(failed_loss)
else:
y.append(loss_fn(r))
config = id2conf[r.config_id]['config']
if len(budgets)>1:
config['budget'] = r.budget
config = CS.Configuration(config_space, config)
x = []
for (name, hp, transform) in zip(hp_names, hps, needs_transform):
if transform:
x.append(hp._inverse_transform(config[name]))
else:
x.append(config[name])
X.append(x)
return(np.array(X), np.array(y), config_space)
[docs] def get_pandas_dataframe(self, budgets=None, loss_fn=lambda r: r.loss):
import numpy as np
import pandas as pd
id2conf = self.get_id2config_mapping()
df_x = pd.DataFrame()
df_y = pd.DataFrame()
if budgets is None:
budgets = self.HB_config['budgets']
all_runs = self.get_all_runs(only_largest_budget=False)
all_runs=list(filter( lambda r: r.budget in budgets, all_runs))
all_configs = []
all_losses = []
for r in all_runs:
if r.loss is None: continue
config = id2conf[r.config_id]['config']
if len(budgets)>1:
config['budget'] = r.budget
all_configs.append(config)
all_losses.append({'loss': r.loss})
#df_x = df_x.append(config, ignore_index=True)
#df_y = df_y.append({'loss': r.loss}, ignore_index=True)
df_X = pd.DataFrame(all_configs)
df_y = pd.DataFrame(all_losses)
return(df_X, df_y)