Source code for hpbandster.core.base_iteration

import sys


import logging
import numpy as np
import pdb

from hpbandster.core.dispatcher import Job


[docs]class Datum(object): def __init__(self, config, config_info, results=None, time_stamps=None, exceptions=None, status='QUEUED', budget=0): self.config = config self.config_info= config_info self.results = results if not results is None else {} self.time_stamps= time_stamps if not time_stamps is None else {} self.exceptions = exceptions if not exceptions is None else {} self.status = status self.budget = budget def __repr__(self): return(\ "\nconfig:{}\n".format(self.config) + \ "config_info:\n{}\n"%self.config_info + \ "losses:\n" '\t'.join(["{}: {}\t".format(k, v['loss']) for k,v in self.results.items()]) + \ "time stamps: {}".format(self.time_stamps) )
[docs]class BaseIteration(object): """ Base class for various iteration possibilities. This decides what configuration should be run on what budget next. Typical choices are e.g. successive halving. Results from runs are processed and (depending on the implementations) determine the further development. """ def __init__(self, HPB_iter, num_configs, budgets, config_sampler, logger=None, result_logger=None): """ Parameters ---------- HPB_iter: int The current HPBandSter iteration index. num_configs: list of ints the number of configurations in each stage of SH budgets: list of floats the budget associated with each stage config_sample: callable a function that returns a valid configuration. Its only argument should be the budget that this config is first scheduled for. This might be used to pick configurations that perform best after this particular budget is exhausted to build a better autoML system. logger: a logger result_logger: hpbandster.api.results.util.json_result_logger object a result logger that writes live results to disk """ self.data = {} # this holds all the configs and results of this iteration self.is_finished = False self.HPB_iter = HPB_iter self.stage = 0 # internal iteration, but different name for clarity self.budgets = budgets self.num_configs = num_configs self.actual_num_configs = [0]*len(num_configs) self.config_sampler = config_sampler self.num_running = 0 self.logger=logger if not logger is None else logging.getLogger('hpbandster') self.result_logger = result_logger
[docs] def add_configuration(self, config = None, config_info={}): """ function to add a new configuration to the current iteration Parameters ---------- config : valid configuration The configuration to add. If None, a configuration is sampled from the config_sampler config_info: dict Some information about the configuration that will be stored in the results """ if config is None: config, config_info = self.config_sampler(self.budgets[self.stage]) if self.is_finished: raise RuntimeError("This HPBandSter iteration is finished, you can't add more configurations!") if self.actual_num_configs[self.stage] == self.num_configs[self.stage]: raise RuntimeError("Can't add another configuration to stage %i in HPBandSter iteration %i."%(self.stage, self.HPB_iter)) config_id = (self.HPB_iter, self.stage, self.actual_num_configs[self.stage]) self.data[config_id] = Datum(config=config, config_info=config_info, budget = self.budgets[self.stage]) self.actual_num_configs[self.stage] += 1 if not self.result_logger is None: self.result_logger.new_config(config_id, config, config_info) return(config_id)
[docs] def register_result(self, job, skip_sanity_checks=False): """ function to register the result of a job This function is called from HB_master, don't call this from your script. """ if self.is_finished: raise RuntimeError("This HB iteration is finished, you can't register more results!") config_id = job.id config = job.kwargs['config'] budget = job.kwargs['budget'] timestamps = job.timestamps result = job.result exception = job.exception d = self.data[config_id] if not skip_sanity_checks: assert d.config == config, 'Configurations differ!' assert d.status == 'RUNNING', "Configuration wasn't scheduled for a run." assert d.budget == budget, 'Budgets differ (%f != %f)!'%(self.data[config_id]['budget'], budget) d.time_stamps[budget] = timestamps d.results[budget] = result if (not job.result is None) and np.isfinite(result['loss']): d.status = 'REVIEW' else: d.status = 'CRASHED' d.exceptions[budget] = exception self.num_running -= 1
[docs] def get_next_run(self): """ function to return the next configuration and budget to run. This function is called from HB_master, don't call this from your script. It returns None if this run of SH is finished or there are pending jobs that need to finish to progress to the next stage. If there are empty slots to be filled in the current SH stage (which never happens in the original SH version), a new configuration will be sampled and scheduled to run next. """ if self.is_finished: return(None) for k,v in self.data.items(): if v.status == 'QUEUED': assert v.budget == self.budgets[self.stage], 'Configuration budget does not align with current stage!' v.status = 'RUNNING' self.num_running += 1 return(k, v.config, v.budget) # check if there are still slots to fill in the current stage and return that if (self.actual_num_configs[self.stage] < self.num_configs[self.stage]): self.add_configuration() return(self.get_next_run()) if self.num_running == 0: # at this point a stage is completed self.process_results() return(self.get_next_run()) return(None)
def _advance_to_next_stage(self, config_ids, losses): """ Function that implements the strategy to advance configs within this iteration Overload this to implement different strategies, like SuccessiveHalving, SuccessiveResampling. Parameters ---------- config_ids: list all config ids to be considered losses: list losses of the run on the current budget Returns ------- list of bool A boolean for each entry in config_ids indicating whether to advance it or not """ raise NotImplementedError('_advance_to_next_stage not implemented for %s'%type(self).__name__)
[docs] def process_results(self): """ function that is called when a stage is completed and needs to be analyzed befor further computations. The code here implements the original SH algorithms by advancing the k-best (lowest loss) configurations at the current budget. k is defined by the num_configs list (see __init__) and the current stage value. For more advanced methods like resampling after each stage, overload this function only. """ self.stage += 1 # collect all config_ids that need to be compared config_ids = list(filter(lambda cid: self.data[cid].status == 'REVIEW', self.data.keys())) if (self.stage >= len(self.num_configs)): self.finish_up() return budgets = [self.data[cid].budget for cid in config_ids] if len(set(budgets)) > 1: raise RuntimeError('Not all configurations have the same budget!') budget = self.budgets[self.stage-1] losses = np.array([self.data[cid].results[budget]['loss'] for cid in config_ids]) advance = self._advance_to_next_stage(config_ids, losses) for i, a in enumerate(advance): if a: self.logger.debug('ITERATION: Advancing config %s to next budget %f'%(config_ids[i], self.budgets[self.stage])) for i, cid in enumerate(config_ids): if advance[i]: self.data[cid].status = 'QUEUED' self.data[cid].budget = self.budgets[self.stage] self.actual_num_configs[self.stage] += 1 else: self.data[cid].status = 'TERMINATED'
[docs] def finish_up(self): self.is_finished = True for k,v in self.data.items(): assert v.status in ['TERMINATED', 'REVIEW', 'CRASHED'], 'Configuration has not finshed yet!' v.status = 'COMPLETED'
[docs]class WarmStartIteration(BaseIteration): """ iteration that imports a privious Result for warm starting """ def __init__(self, Result, config_generator): self.is_finished=False self.stage = 0 id2conf = Result.get_id2config_mapping() delta_t = - max(map(lambda r: r.time_stamps['finished'], Result.get_all_runs())) super().__init__(-1, [len(id2conf)] , [None], None) for i, id in enumerate(id2conf): new_id = self.add_configuration(config=id2conf[id]['config'], config_info=id2conf[id]['config_info']) for r in Result.get_runs_by_id(id): j = Job(new_id, config=id2conf[id]['config'], budget=r.budget) j.result = {'loss': r.loss, 'info': r.info} j.error_logs = r.error_logs for k,v in r.time_stamps.items(): j.timestamps[k] = v + delta_t self.register_result(j , skip_sanity_checks=True) config_generator.new_result(j, update_model=(i==len(id2conf)-1)) # mark as finished, as no more runs should be executed from these runs self.is_finished = True
[docs] def fix_timestamps(self, time_ref): """ manipulates internal time stamps such that the last run ends at time 0 """ for k,v in self.data.items(): for kk, vv in v.time_stamps.items(): for kkk,vvv in vv.items(): self.data[k].time_stamps[kk][kkk] += time_ref