Source code for hpbandster.core.master

import os
import threading
import time
import math
import pdb
import copy
import logging

import numpy as np


from hpbandster.core.dispatcher import Dispatcher
from hpbandster.core.result import Result
from hpbandster.core.base_iteration import WarmStartIteration




[docs]class Master(object): def __init__(self, run_id, config_generator, working_directory='.', ping_interval=60, nameserver='127.0.0.1', nameserver_port=None, host=None, shutdown_workers=True, job_queue_sizes=(-1,0), dynamic_queue_size=True, logger=None, result_logger=None, previous_result = None, ): """The Master class is responsible for the book keeping and to decide what to run next. Optimizers are instantiations of Master, that handle the important steps of deciding what configurations to run on what budget when. Parameters ---------- run_id : string A unique identifier of that Hyperband run. Use, for example, the cluster's JobID when running multiple concurrent runs to separate them config_generator: hpbandster.config_generators object An object that can generate new configurations and registers results of executed runs working_directory: string The top level working directory accessible to all compute nodes(shared filesystem). eta : float In each iteration, a complete run of sequential halving is executed. In it, after evaluating each configuration on the same subset size, only a fraction of 1/eta of them 'advances' to the next round. Must be greater or equal to 2. min_budget : float The smallest budget to consider. Needs to be positive! max_budget : float the largest budget to consider. Needs to be larger than min_budget! The budgets will be geometrically distributed :math:`\sim \eta^k` for :math:`k\in [0, 1, ... , num\_subsets - 1]`. ping_interval: int number of seconds between pings to discover new nodes. Default is 60 seconds. nameserver: str address of the Pyro4 nameserver nameserver_port: int port of Pyro4 nameserver host: str ip (or name that resolves to that) of the network interface to use shutdown_workers: bool flag to control whether the workers are shutdown after the computation is done job_queue_size: tuple of ints min and max size of the job queue. During the run, when the number of jobs in the queue reaches the min value, it will be filled up to the max size. Default: (0,1) dynamic_queue_size: bool Whether or not to change the queue size based on the number of workers available. If true (default), the job_queue_sizes are relative to the current number of workers. logger: logging.logger like object the logger to output some (more or less meaningful) information result_logger: hpbandster.api.results.util.json_result_logger object a result logger that writes live results to disk previous_result: hpbandster.core.result.Result object previous run to warmstart the run """ self.working_directory = working_directory os.makedirs(self.working_directory, exist_ok=True) if logger is None: self.logger = logging.getLogger('hpbandster') else: self.logger = logger self.result_logger = result_logger self.config_generator = config_generator self.time_ref = None self.iterations = [] self.jobs = [] self.num_running_jobs = 0 self.job_queue_sizes = job_queue_sizes self.user_job_queue_sizes = job_queue_sizes self.dynamic_queue_size = dynamic_queue_size if job_queue_sizes[0] >= job_queue_sizes[1]: raise ValueError("The queue size range needs to be (min, max) with min<max!") if previous_result is None: self.warmstart_iteration = [] else: self.warmstart_iteration = [WarmStartIteration(previous_result, self.config_generator)] # condition to synchronize the job_callback and the queue self.thread_cond = threading.Condition() self.config = { 'time_ref' : self.time_ref } self.dispatcher = Dispatcher( self.job_callback, queue_callback=self.adjust_queue_size, run_id=run_id, ping_interval=ping_interval, nameserver=nameserver, nameserver_port=nameserver_port, host=host) self.dispatcher_thread = threading.Thread(target=self.dispatcher.run) self.dispatcher_thread.start()
[docs] def shutdown(self, shutdown_workers=False): self.logger.debug('HBMASTER: shutdown initiated, shutdown_workers = %s'%(str(shutdown_workers))) self.dispatcher.shutdown(shutdown_workers) self.dispatcher_thread.join()
[docs] def wait_for_workers(self, min_n_workers=1): """ helper function to hold execution until some workers are active Parameters ---------- min_n_workers: int minimum number of workers present before the run starts """ self.logger.debug('wait_for_workers trying to get the condition') with self.thread_cond: while (self.dispatcher.number_of_workers() < min_n_workers): self.logger.debug('HBMASTER: only %i worker(s) available, waiting for at least %i.'%(self.dispatcher.number_of_workers(), min_n_workers)) self.thread_cond.wait(1) self.dispatcher.trigger_discover_worker() self.logger.debug('Enough workers to start this run!')
[docs] def get_next_iteration(self, iteration, iteration_kwargs): """ instantiates the next iteration Overwrite this to change the iterations for different optimizers Parameters ---------- iteration: int the index of the iteration to be instantiated iteration_kwargs: dict additional kwargs for the iteration class Returns ------- HB_iteration: a valid HB iteration object """ raise NotImplementedError('implement get_next_iteration for %s'%(type(self).__name__))
[docs] def run(self, n_iterations=1, min_n_workers=1, iteration_kwargs = {},): """ run n_iterations of SuccessiveHalving Parameters ---------- n_iterations: int number of iterations to be performed in this run min_n_workers: int minimum number of workers before starting the run """ self.wait_for_workers(min_n_workers) iteration_kwargs.update({'result_logger': self.result_logger}) if self.time_ref is None: self.time_ref = time.time() self.config['time_ref'] = self.time_ref self.logger.info('HBMASTER: starting run at %s'%(str(self.time_ref))) self.thread_cond.acquire() while True: self._queue_wait() next_run = None # find a new run to schedule for i in self.active_iterations(): next_run = self.iterations[i].get_next_run() if not next_run is None: break if not next_run is None: self.logger.debug('HBMASTER: schedule new run for iteration %i'%i) self._submit_job(*next_run) continue else: if n_iterations > 0: #we might be able to start the next iteration self.iterations.append(self.get_next_iteration(len(self.iterations), iteration_kwargs)) n_iterations -= 1 continue # at this point there is no imediate run that can be scheduled, # so wait for some job to finish if there are active iterations if self.active_iterations(): self.thread_cond.wait() else: break self.thread_cond.release() for i in self.warmstart_iteration: i.fix_timestamps(self.time_ref) ws_data = [i.data for i in self.warmstart_iteration] return Result([copy.deepcopy(i.data) for i in self.iterations] + ws_data, self.config)
[docs] def adjust_queue_size(self, number_of_workers=None): self.logger.debug('HBMASTER: number of workers changed to %s'%str(number_of_workers)) with self.thread_cond: self.logger.debug('adjust_queue_size: lock accquired') if self.dynamic_queue_size: nw = self.dispatcher.number_of_workers() if number_of_workers is None else number_of_workers self.job_queue_sizes = (self.user_job_queue_sizes[0] + nw, self.user_job_queue_sizes[1] + nw) self.logger.info('HBMASTER: adjusted queue size to %s'%str(self.job_queue_sizes)) self.thread_cond.notify_all()
[docs] def job_callback(self, job): """ method to be called when a job has finished this will do some book keeping and call the user defined new_result_callback if one was specified """ self.logger.debug('job_callback for %s started'%str(job.id)) with self.thread_cond: self.logger.debug('job_callback for %s got condition'%str(job.id)) self.num_running_jobs -= 1 if not self.result_logger is None: self.result_logger(job) self.iterations[job.id[0]].register_result(job) self.config_generator.new_result(job) if self.num_running_jobs <= self.job_queue_sizes[0]: self.logger.debug("HBMASTER: Trying to run another job!") self.thread_cond.notify() self.logger.debug('job_callback for %s finished'%str(job.id))
def _queue_wait(self): """ helper function to wait for the queue to not overflow/underload it """ if self.num_running_jobs >= self.job_queue_sizes[1]: while(self.num_running_jobs > self.job_queue_sizes[0]): self.logger.debug('HBMASTER: running jobs: %i, queue sizes: %s -> wait'%(self.num_running_jobs, str(self.job_queue_sizes))) self.thread_cond.wait() def _submit_job(self, config_id, config, budget): """ hidden function to submit a new job to the dispatcher This function handles the actual submission in a (hopefully) thread save way """ self.logger.debug('HBMASTER: trying submitting job %s to dispatcher'%str(config_id)) with self.thread_cond: self.logger.debug('HBMASTER: submitting job %s to dispatcher'%str(config_id)) self.dispatcher.submit_job(config_id, config=config, budget=budget, working_directory=self.working_directory) self.num_running_jobs += 1 #shouldn't the next line be executed while holding the condition? self.logger.debug("HBMASTER: job %s submitted to dispatcher"%str(config_id))
[docs] def active_iterations(self): """ function to find active (not marked as finished) iterations Returns ------- list: all active iteration objects (empty if there are none) """ l = list(filter(lambda idx: not self.iterations[idx].is_finished, range(len(self.iterations)))) return(l)
def __del__(self): pass