Skip to content

DEHB#

DEHBBase(cs=None, f=None, dimensions=None, mutation_factor=None, crossover_prob=None, strategy=None, min_budget=None, max_budget=None, eta=None, min_clip=None, max_clip=None, boundary_fix_type='random', max_age=np.inf, **kwargs) #

Source code in src/dehb/optimizers/dehb.py
def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=None,
             crossover_prob=None, strategy=None, min_budget=None,
             max_budget=None, eta=None, min_clip=None, max_clip=None,
             boundary_fix_type='random', max_age=np.inf, **kwargs):
    # Miscellaneous
    self._setup_logger(kwargs)

    # Benchmark related variables
    self.cs = cs
    self.configspace = True if isinstance(self.cs, ConfigSpace.ConfigurationSpace) else False
    if self.configspace:
        self.dimensions = len(self.cs.get_hyperparameters())
    elif dimensions is None or not isinstance(dimensions, (int, np.integer)):
        assert "Need to specify `dimensions` as an int when `cs` is not available/specified!"
    else:
        self.dimensions = dimensions
    self.f = f

    # DE related variables
    self.mutation_factor = mutation_factor
    self.crossover_prob = crossover_prob
    self.strategy = strategy
    self.fix_type = boundary_fix_type
    self.max_age = max_age
    self.de_params = {
        "mutation_factor": self.mutation_factor,
        "crossover_prob": self.crossover_prob,
        "strategy": self.strategy,
        "configspace": self.configspace,
        "boundary_fix_type": self.fix_type,
        "max_age": self.max_age,
        "cs": self.cs,
        "dimensions": self.dimensions,
        "f": f
    }

    # Hyperband related variables
    self.min_budget = min_budget
    self.max_budget = max_budget
    if self.max_budget <= self.min_budget:
        self.logger.error("Only (Max Budget > Min Budget) is supported for DEHB.")
        if self.max_budget == self.min_budget:
            self.logger.error(
                "If you have a fixed fidelity, " \
                "you can instead run DE. For more information checkout: " \
                "https://automl.github.io/DEHB/references/de")
        raise AssertionError()
    self.eta = eta
    self.min_clip = min_clip
    self.max_clip = max_clip

    # Precomputing budget spacing and number of configurations for HB iterations
    self.max_SH_iter = None
    self.budgets = None
    if self.min_budget is not None and \
       self.max_budget is not None and \
       self.eta is not None:
        self.max_SH_iter = -int(np.log(self.min_budget / self.max_budget) / np.log(self.eta)) + 1
        self.budgets = self.max_budget * np.power(self.eta,
                                                 -np.linspace(start=self.max_SH_iter - 1,
                                                              stop=0, num=self.max_SH_iter))

    # Updating DE parameter list
    self.de_params.update({"output_path": self.output_path})

    # Global trackers
    self.population = None
    self.fitness = None
    self.inc_score = np.inf
    self.inc_config = None
    self.history = []

get_next_iteration(iteration) #

Computes the Successive Halving spacing

Given the iteration index, computes the budget spacing to be used and the number of configurations to be used for the SH iterations.

Parameters#

iteration : int Iteration index clip : int, {1, 2, 3, ..., None} If not None, clips the minimum number of configurations to 'clip'

Returns#

ns : array budgets : array

Source code in src/dehb/optimizers/dehb.py
def get_next_iteration(self, iteration):
    '''Computes the Successive Halving spacing

    Given the iteration index, computes the budget spacing to be used and
    the number of configurations to be used for the SH iterations.

    Parameters
    ----------
    iteration : int
        Iteration index
    clip : int, {1, 2, 3, ..., None}
        If not None, clips the minimum number of configurations to 'clip'

    Returns
    -------
    ns : array
    budgets : array
    '''
    # number of 'SH runs'
    s = self.max_SH_iter - 1 - (iteration % self.max_SH_iter)
    # budget spacing for this iteration
    budgets = self.budgets[(-s-1):]
    # number of configurations in that bracket
    n0 = int(np.floor((self.max_SH_iter)/(s+1)) * self.eta**s)
    ns = [max(int(n0*(self.eta**(-i))), 1) for i in range(s+1)]
    if self.min_clip is not None and self.max_clip is not None:
        ns = np.clip(ns, a_min=self.min_clip, a_max=self.max_clip)
    elif self.min_clip is not None:
        ns = np.clip(ns, a_min=self.min_clip, a_max=np.max(ns))

    return ns, budgets

get_incumbents() #

Returns a tuple of the (incumbent configuration, incumbent score/fitness).

Source code in src/dehb/optimizers/dehb.py
def get_incumbents(self):
    """ Returns a tuple of the (incumbent configuration, incumbent score/fitness). """
    if self.configspace:
        return self.vector_to_configspace(self.inc_config), self.inc_score
    return self.inc_config, self.inc_score

DEHB(cs=None, f=None, dimensions=None, mutation_factor=0.5, crossover_prob=0.5, strategy='rand1_bin', min_budget=None, max_budget=None, eta=3, min_clip=None, max_clip=None, configspace=True, boundary_fix_type='random', max_age=np.inf, n_workers=None, client=None, async_strategy='immediate', **kwargs) #

Bases: DEHBBase

Source code in src/dehb/optimizers/dehb.py
def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=0.5,
             crossover_prob=0.5, strategy='rand1_bin', min_budget=None,
             max_budget=None, eta=3, min_clip=None, max_clip=None, configspace=True,
             boundary_fix_type='random', max_age=np.inf, n_workers=None, client=None,
             async_strategy="immediate", **kwargs):
    super().__init__(cs=cs, f=f, dimensions=dimensions, mutation_factor=mutation_factor,
                     crossover_prob=crossover_prob, strategy=strategy, min_budget=min_budget,
                     max_budget=max_budget, eta=eta, min_clip=min_clip, max_clip=max_clip,
                     configspace=configspace, boundary_fix_type=boundary_fix_type,
                     max_age=max_age, **kwargs)
    self.de_params.update({"async_strategy": async_strategy})
    self.iteration_counter = -1
    self.de = {}
    self._max_pop_size = None
    self.active_brackets = []  # list of SHBracketManager objects
    self.traj = []
    self.runtime = []
    self.history = []
    self.start = None

    # Dask variables
    if n_workers is None and client is None:
        raise ValueError("Need to specify either 'n_workers'(>0) or 'client' (a Dask client)!")
    if client is not None and isinstance(client, Client):
        self.client = client
        self.n_workers = len(client.ncores())
    else:
        self.n_workers = n_workers
        if self.n_workers > 1:
            self.client = Client(
                n_workers=self.n_workers, processes=True, threads_per_worker=1, scheduler_port=0
            )  # port 0 makes Dask select a random free port
        else:
            self.client = None
    self.futures = []
    self.shared_data = None

    # Initializing DE subpopulations
    self._get_pop_sizes()
    self._init_subpop()

    # Misc.
    self.available_gpus = None
    self.gpu_usage = None
    self.single_node_with_gpus = None

__getstate__() #

Allows the object to picklable while having Dask client as a class attribute.

Source code in src/dehb/optimizers/dehb.py
def __getstate__(self):
    """ Allows the object to picklable while having Dask client as a class attribute.
    """
    d = dict(self.__dict__)
    d["client"] = None  # hack to allow Dask client to be a class attribute
    d["logger"] = None  # hack to allow logger object to be a class attribute
    return d

__del__() #

Ensures a clean kill of the Dask client and frees up a port.

Source code in src/dehb/optimizers/dehb.py
def __del__(self):
    """ Ensures a clean kill of the Dask client and frees up a port.
    """
    if hasattr(self, "client") and isinstance(self, Client):
        self.client.close()

distribute_gpus() #

Function to create a GPU usage tracker dict.

The idea is to extract the exact GPU device IDs available. During job submission, each submitted job is given a preference of a GPU device ID based on the GPU device with the least number of active running jobs. On retrieval of the result, this gpu usage dict is updated for the device ID that the finished job was mapped to.

Source code in src/dehb/optimizers/dehb.py
def distribute_gpus(self):
    """ Function to create a GPU usage tracker dict.

    The idea is to extract the exact GPU device IDs available. During job submission, each
    submitted job is given a preference of a GPU device ID based on the GPU device with the
    least number of active running jobs. On retrieval of the result, this gpu usage dict is
    updated for the device ID that the finished job was mapped to.
    """
    try:
        available_gpus = os.environ["CUDA_VISIBLE_DEVICES"]
        available_gpus = available_gpus.strip().split(",")
        self.available_gpus = [int(_id) for _id in available_gpus]
    except KeyError as e:
        print("Unable to find valid GPU devices. "
              "Environment variable {} not visible!".format(str(e)))
        self.available_gpus = []
    self.gpu_usage = dict()
    for _id in self.available_gpus:
        self.gpu_usage[_id] = 0

clean_inactive_brackets() #

Removes brackets from the active list if it is done as communicated by Bracket Manager

Source code in src/dehb/optimizers/dehb.py
def clean_inactive_brackets(self):
    """ Removes brackets from the active list if it is done as communicated by Bracket Manager
    """
    if len(self.active_brackets) == 0:
        return
    self.active_brackets = [
        bracket for bracket in self.active_brackets if ~bracket.is_bracket_done()
    ]
    return

is_worker_available(verbose=False) #

Checks if at least one worker is available to run a job

Source code in src/dehb/optimizers/dehb.py
def is_worker_available(self, verbose=False):
    """ Checks if at least one worker is available to run a job
    """
    if self.n_workers == 1 or self.client is None or not isinstance(self.client, Client):
        # in the synchronous case, one worker is always available
        return True
    # checks the absolute number of workers mapped to the client scheduler
    # client.ncores() should return a dict with the keys as unique addresses to these workers
    # treating the number of available workers in this manner
    workers = self._get_worker_count()  # len(self.client.ncores())
    if len(self.futures) >= workers:
        # pause/wait if active worker count greater allocated workers
        return False
    return True

submit_job(job_info, **kwargs) #

Asks a free worker to run the objective function on config and budget

Source code in src/dehb/optimizers/dehb.py
def submit_job(self, job_info, **kwargs):
    """ Asks a free worker to run the objective function on config and budget
    """
    job_info["kwargs"] = self.shared_data if self.shared_data is not None else kwargs
    # submit to to Dask client
    if self.n_workers > 1 or isinstance(self.client, Client):
        if self.single_node_with_gpus:
            # managing GPU allocation for the job to be submitted
            job_info.update({"gpu_devices": self._get_gpu_id_with_low_load()})
        self.futures.append(
            self.client.submit(self._f_objective, job_info)
        )
    else:
        # skipping scheduling to Dask worker to avoid added overheads in the synchronous case
        self.futures.append(self._f_objective(job_info))

    # pass information of job submission to Bracket Manager
    for bracket in self.active_brackets:
        if bracket.bracket_id == job_info['bracket_id']:
            # registering is IMPORTANT for Bracket Manager to perform SH
            bracket.register_job(job_info['budget'])
            break

run(fevals=None, brackets=None, total_cost=None, single_node_with_gpus=False, verbose=False, debug=False, save_intermediate=True, save_history=True, name=None, **kwargs) #

Main interface to run optimization by DEHB

This function waits on workers and if a worker is free, asks for a configuration and a budget to evaluate on and submits it to the worker. In each loop, it checks if a job is complete, fetches the results, carries the necessary processing of it asynchronously to the worker computations.

The duration of the DEHB run can be controlled by specifying one of 3 parameters. If more than one are specified, DEHB selects only one in the priority order (high to low): 1) Number of function evaluations (fevals) 2) Number of Successive Halving brackets run under Hyperband (brackets) 3) Total computational cost (in seconds) aggregated by all function evaluations (total_cost)

Source code in src/dehb/optimizers/dehb.py
@logger.catch
def run(self, fevals=None, brackets=None, total_cost=None, single_node_with_gpus=False,
        verbose=False, debug=False, save_intermediate=True, save_history=True, name=None, **kwargs):
    """ Main interface to run optimization by DEHB

    This function waits on workers and if a worker is free, asks for a configuration and a
    budget to evaluate on and submits it to the worker. In each loop, it checks if a job
    is complete, fetches the results, carries the necessary processing of it asynchronously
    to the worker computations.

    The duration of the DEHB run can be controlled by specifying one of 3 parameters. If more
    than one are specified, DEHB selects only one in the priority order (high to low):
    1) Number of function evaluations (fevals)
    2) Number of Successive Halving brackets run under Hyperband (brackets)
    3) Total computational cost (in seconds) aggregated by all function evaluations (total_cost)
    """
    # checks if a Dask client exists
    if len(kwargs) > 0 and self.n_workers > 1 and isinstance(self.client, Client):
        # broadcasts all additional data passed as **kwargs to all client workers
        # this reduces overload in the client-worker communication by not having to
        # serialize the redundant data used by all workers for every job
        self.shared_data = self.client.scatter(kwargs, broadcast=True)

    # allows each worker to be mapped to a different GPU when running on a single node
    # where all available GPUs are accessible
    self.single_node_with_gpus = single_node_with_gpus
    if self.single_node_with_gpus:
        self.distribute_gpus()

    self.start = time.time()
    if verbose:
        print("\nLogging at {} for optimization starting at {}\n".format(
            os.path.join(os.getcwd(), self.log_filename),
            time.strftime("%x %X %Z", time.localtime(self.start))
        ))
    if debug:
        logger.configure(handlers=[{"sink": sys.stdout}])
    while True:
        if self._is_run_budget_exhausted(fevals, brackets, total_cost):
            break
        if self.is_worker_available():
            job_info = self._get_next_job()
            if brackets is not None and job_info['bracket_id'] >= brackets:
                # ignore submission and only collect results
                # when brackets are chosen as run budget, an extra bracket is created
                # since iteration_counter is incremented in _get_next_job() and then checked
                # in _is_run_budget_exhausted(), therefore, need to skip suggestions
                # coming from the extra allocated bracket
                # _is_run_budget_exhausted() will not return True until all the lower brackets
                # have finished computation and returned its results
                pass
            else:
                if self.n_workers > 1 or isinstance(self.client, Client):
                    self.logger.debug("{}/{} worker(s) available.".format(
                        self._get_worker_count() - len(self.futures), self._get_worker_count()
                    ))
                # submits job_info to a worker for execution
                self.submit_job(job_info, **kwargs)
                if verbose:
                    budget = job_info['budget']
                    self._verbosity_runtime(fevals, brackets, total_cost)
                    self.logger.info(
                        "Evaluating a configuration with budget {} under "
                        "bracket ID {}".format(budget, job_info['bracket_id'])
                    )
                    self.logger.info(
                        "Best score seen/Incumbent score: {}".format(self.inc_score)
                    )
                self._verbosity_debug()
        self._fetch_results_from_workers()
        if save_intermediate and self.inc_config is not None:
            self._save_incumbent(name)
        if save_history and self.history is not None:
            self._save_history(name)
        self.clean_inactive_brackets()
    # end of while

    if verbose and len(self.futures) > 0:
        self.logger.info(
            "DEHB optimisation over! Waiting to collect results from workers running..."
        )
    while len(self.futures) > 0:
        self._fetch_results_from_workers()
        if save_intermediate and self.inc_config is not None:
            self._save_incumbent(name)
        if save_history and self.history is not None:
            self._save_history(name)
        time.sleep(0.05)  # waiting 50ms

    if verbose:
        time_taken = time.time() - self.start
        self.logger.info("End of optimisation! Total duration: {}; Total fevals: {}\n".format(
            time_taken, len(self.traj)
        ))
        self.logger.info("Incumbent score: {}".format(self.inc_score))
        self.logger.info("Incumbent config: ")
        if self.configspace:
            config = self.vector_to_configspace(self.inc_config)
            for k, v in config.get_dictionary().items():
                self.logger.info("{}: {}".format(k, v))
        else:
            self.logger.info("{}".format(self.inc_config))
    self._save_incumbent(name)
    self._save_history(name)
    return np.array(self.traj), np.array(self.runtime), np.array(self.history, dtype=object)