Skip to content

DEHB#

DEHBBase(cs=None, f=None, dimensions=None, mutation_factor=None, crossover_prob=None, strategy=None, min_fidelity=None, max_fidelity=None, eta=None, min_clip=None, max_clip=None, seed=None, boundary_fix_type='random', max_age=np.inf, **kwargs) #

Source code in src/dehb/optimizers/dehb.py
def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=None,
             crossover_prob=None, strategy=None, min_fidelity=None,
             max_fidelity=None, eta=None, min_clip=None, max_clip=None, seed=None,
             boundary_fix_type='random', max_age=np.inf, **kwargs):
    # Rng
    self.rng = np.random.default_rng(seed)

    # Miscellaneous
    self._setup_logger(kwargs)
    self.config_repository = ConfigRepository()

    # Benchmark related variables
    self.cs = cs
    self.use_configspace = True if isinstance(self.cs, ConfigSpace.ConfigurationSpace) else False
    if self.use_configspace:
        self.dimensions = len(self.cs.get_hyperparameters())
    elif dimensions is None or not isinstance(dimensions, (int, np.integer)):
        assert "Need to specify `dimensions` as an int when `cs` is not available/specified!"
    else:
        self.dimensions = dimensions
    self.f = f

    # DE related variables
    self.mutation_factor = mutation_factor
    self.crossover_prob = crossover_prob
    self.strategy = strategy
    self.fix_type = boundary_fix_type
    self.max_age = max_age
    self.de_params = {
        "mutation_factor": self.mutation_factor,
        "crossover_prob": self.crossover_prob,
        "strategy": self.strategy,
        "configspace": self.use_configspace,
        "boundary_fix_type": self.fix_type,
        "max_age": self.max_age,
        "cs": self.cs,
        "dimensions": self.dimensions,
        "rng": self.rng,
        "f": f,
    }

    # Hyperband related variables
    self.min_fidelity = min_fidelity
    self.max_fidelity = max_fidelity
    if self.max_fidelity <= self.min_fidelity:
        self.logger.error("Only (Max Fidelity > Min Fidelity) is supported for DEHB.")
        if self.max_fidelity == self.min_fidelity:
            self.logger.error(
                "If you have a fixed fidelity, " \
                "you can instead run DE. For more information checkout: " \
                "https://automl.github.io/DEHB/references/de")
        raise AssertionError()
    self.eta = eta
    self.min_clip = min_clip
    self.max_clip = max_clip

    # Precomputing fidelity spacing and number of configurations for HB iterations
    self._pre_compute_fidelity_spacing()

    # Updating DE parameter list
    self.de_params.update({"output_path": self.output_path})

    # Global trackers
    self.population = None
    self.fitness = None
    self.inc_score = np.inf
    self.inc_config = None
    self.history = []

get_next_iteration(iteration) #

Computes the Successive Halving spacing.

Given the iteration index, computes the fidelity spacing to be used and the number of configurations to be used for the SH iterations.

Parameters#

iteration : int Iteration index clip : int, {1, 2, 3, ..., None} If not None, clips the minimum number of configurations to 'clip'

Returns:#

ns : array fidelities : array

Source code in src/dehb/optimizers/dehb.py
def get_next_iteration(self, iteration):
    """Computes the Successive Halving spacing.

    Given the iteration index, computes the fidelity spacing to be used and
    the number of configurations to be used for the SH iterations.

    Parameters
    ----------
    iteration : int
        Iteration index
    clip : int, {1, 2, 3, ..., None}
        If not None, clips the minimum number of configurations to 'clip'

    Returns:
    -------
    ns : array
    fidelities : array
    """
    # number of 'SH runs'
    s = self.max_SH_iter - 1 - (iteration % self.max_SH_iter)
    # fidelity spacing for this iteration
    fidelities = self.fidelities[(-s-1):]
    # number of configurations in that bracket
    n0 = int(np.floor((self.max_SH_iter)/(s+1)) * self.eta**s)
    ns = [max(int(n0*(self.eta**(-i))), 1) for i in range(s+1)]
    if self.min_clip is not None and self.max_clip is not None:
        ns = np.clip(ns, a_min=self.min_clip, a_max=self.max_clip)
    elif self.min_clip is not None:
        ns = np.clip(ns, a_min=self.min_clip, a_max=np.max(ns))

    return ns, fidelities

get_incumbents() #

Returns a tuple of the (incumbent configuration, incumbent score/fitness).

Source code in src/dehb/optimizers/dehb.py
def get_incumbents(self):
    """Returns a tuple of the (incumbent configuration, incumbent score/fitness)."""
    if self.use_configspace:
        return self.vector_to_configspace(self.inc_config), self.inc_score
    return self.inc_config, self.inc_score

DEHB(cs=None, f=None, dimensions=None, mutation_factor=0.5, crossover_prob=0.5, strategy='rand1_bin', min_fidelity=None, max_fidelity=None, eta=3, min_clip=None, max_clip=None, seed=None, configspace=True, boundary_fix_type='random', max_age=np.inf, n_workers=None, client=None, async_strategy='immediate', save_freq='incumbent', resume=False, **kwargs) #

Bases: DEHBBase

Source code in src/dehb/optimizers/dehb.py
def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=0.5,
             crossover_prob=0.5, strategy="rand1_bin", min_fidelity=None,
             max_fidelity=None, eta=3, min_clip=None, max_clip=None, seed=None,
             configspace=True, boundary_fix_type="random", max_age=np.inf, n_workers=None,
             client=None, async_strategy="immediate", save_freq="incumbent", resume=False,
             **kwargs):
    super().__init__(cs=cs, f=f, dimensions=dimensions, mutation_factor=mutation_factor,
                     crossover_prob=crossover_prob, strategy=strategy, min_fidelity=min_fidelity,
                     max_fidelity=max_fidelity, eta=eta, min_clip=min_clip, max_clip=max_clip, 
                     seed=seed, configspace=configspace, boundary_fix_type=boundary_fix_type,
                     max_age=max_age, **kwargs)
    self.de_params.update({"async_strategy": async_strategy})
    self.iteration_counter = -1
    self.de = {}
    self._max_pop_size = None
    self.active_brackets = []  # list of SHBracketManager objects
    self.traj = []
    self.runtime = []
    self.history = []
    self._ask_counter = 0
    self._tell_counter = 0
    self.start = None
    if save_freq not in ["incumbent", "step", "end"] and save_freq is not None:
        self.logger.warning(f"Save frequency {save_freq} unknown. Resorting to using 'end'.")
        save_freq = "end"
    self.save_freq = "end" if save_freq is None else save_freq

    # Dask variables
    if n_workers is None and client is None:
        raise ValueError("Need to specify either 'n_workers'(>0) or 'client' (a Dask client)!")
    if client is not None and isinstance(client, Client):
        self.client = client
        self.n_workers = len(client.ncores())
    else:
        self.n_workers = n_workers
        if self.n_workers > 1:
            self.client = Client(
                n_workers=self.n_workers, processes=True, threads_per_worker=1, scheduler_port=0
            )  # port 0 makes Dask select a random free port
        else:
            self.client = None
    self.futures = []
    self.shared_data = None

    # Initializing DE subpopulations
    self._get_pop_sizes()
    self._init_subpop()

    # Misc.
    self.available_gpus = None
    self.gpu_usage = None
    self.single_node_with_gpus = None

    # Setup logging and potentially reload state
    if resume:
        self.logger.info("Loading checkpoint...")
        success = self._load_checkpoint(self.output_path)
        if not success:
            self.logger.error("Checkpoint could not be loaded. " \
                              "Please refer to the prior warning in order to " \
                              "identifiy the problem.")
            raise AttributeError("Checkpoint could not be loaded. Check the logs" \
                                 "for more information")
    elif (self.output_path / "dehb_state.json").exists():
        self.logger.warning("A checkpoint already exists, " \
                            "results could potentially be overwritten.")

    # Save initial random state
    self.random_state = self.rng.bit_generator.state
    if self.use_configspace:
        self.cs_random_state = self.cs.random.get_state()

__getstate__() #

Allows the object to picklable while having Dask client as a class attribute.

Source code in src/dehb/optimizers/dehb.py
def __getstate__(self):
    """Allows the object to picklable while having Dask client as a class attribute."""
    d = dict(self.__dict__)
    d["client"] = None  # hack to allow Dask client to be a class attribute
    d["logger"] = None  # hack to allow logger object to be a class attribute
    return d

__del__() #

Ensures a clean kill of the Dask client and frees up a port.

Source code in src/dehb/optimizers/dehb.py
def __del__(self):
    """Ensures a clean kill of the Dask client and frees up a port."""
    if hasattr(self, "client") and isinstance(self, Client):
        self.client.close()

distribute_gpus() #

Function to create a GPU usage tracker dict.

The idea is to extract the exact GPU device IDs available. During job submission, each submitted job is given a preference of a GPU device ID based on the GPU device with the least number of active running jobs. On retrieval of the result, this gpu usage dict is updated for the device ID that the finished job was mapped to.

Source code in src/dehb/optimizers/dehb.py
def distribute_gpus(self):
    """Function to create a GPU usage tracker dict.

    The idea is to extract the exact GPU device IDs available. During job submission, each
    submitted job is given a preference of a GPU device ID based on the GPU device with the
    least number of active running jobs. On retrieval of the result, this gpu usage dict is
    updated for the device ID that the finished job was mapped to.
    """
    try:
        available_gpus = os.environ["CUDA_VISIBLE_DEVICES"]
        available_gpus = available_gpus.strip().split(",")
        self.available_gpus = [int(_id) for _id in available_gpus]
    except KeyError as e:
        print("Unable to find valid GPU devices. "
              f"Environment variable {str(e)} not visible!")
        self.available_gpus = []
    self.gpu_usage = dict()
    for _id in self.available_gpus:
        self.gpu_usage[_id] = 0

clean_inactive_brackets() #

Removes brackets from the active list if it is done as communicated by Bracket Manager.

Source code in src/dehb/optimizers/dehb.py
def clean_inactive_brackets(self):
    """Removes brackets from the active list if it is done as communicated by Bracket Manager."""
    if len(self.active_brackets) == 0:
        return
    self.active_brackets = [
        bracket for bracket in self.active_brackets if ~bracket.is_bracket_done()
    ]
    return

is_worker_available(verbose=False) #

Checks if at least one worker is available to run a job.

Source code in src/dehb/optimizers/dehb.py
def is_worker_available(self, verbose=False):
    """Checks if at least one worker is available to run a job."""
    if self.n_workers == 1 or self.client is None or not isinstance(self.client, Client):
        # in the synchronous case, one worker is always available
        return True
    # checks the absolute number of workers mapped to the client scheduler
    # client.ncores() should return a dict with the keys as unique addresses to these workers
    # treating the number of available workers in this manner
    workers = self._get_worker_count()  # len(self.client.ncores())
    if len(self.futures) >= workers:
        # pause/wait if active worker count greater allocated workers
        return False
    return True

ask(n_configs=1) #

Get the next configuration to run from the optimizer.

The retrieved configuration can then be evaluated by the user. After evaluation use tell to report the results back to the optimizer. For more information, please refer to the description of tell.

PARAMETER DESCRIPTION
n_configs

Number of configs to ask for. Defaults to 1.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION

dict or list of dict: Job info(s) of next configuration to evaluate.

Source code in src/dehb/optimizers/dehb.py
def ask(self, n_configs: int=1):
    """Get the next configuration to run from the optimizer.

    The retrieved configuration can then be evaluated by the user.
    After evaluation use `tell` to report the results back to the optimizer.
    For more information, please refer to the description of `tell`.

    Args:
        n_configs (int, optional): Number of configs to ask for. Defaults to 1.

    Returns:
        dict or list of dict: Job info(s) of next configuration to evaluate.
    """
    jobs = []
    if n_configs == 1:
        jobs = self._get_next_job()
        self._ask_counter += 1
    else:
        for _ in range(n_configs):
            jobs.append(self._get_next_job())
            self._ask_counter += 1
    # Save random state after ask
    self.random_state = self.rng.bit_generator.state
    if self.use_configspace:
        self.cs_random_state = self.cs.random.get_state()
    return jobs

submit_job(job_info, **kwargs) #

Asks a free worker to run the objective function on config and fidelity.

Source code in src/dehb/optimizers/dehb.py
def submit_job(self, job_info, **kwargs):
    """Asks a free worker to run the objective function on config and fidelity."""
    job_info["kwargs"] = self.shared_data if self.shared_data is not None else kwargs
    # submit to Dask client
    if self.n_workers > 1 or isinstance(self.client, Client):
        if self.single_node_with_gpus:
            # managing GPU allocation for the job to be submitted
            job_info.update({"gpu_devices": self._get_gpu_id_with_low_load()})
        self.futures.append(
            self.client.submit(self._f_objective, job_info)
        )
    else:
        # skipping scheduling to Dask worker to avoid added overheads in the synchronous case
        self.futures.append(self._f_objective(job_info))

tell(job_info, result, replay=False) #

Feed a result back to the optimizer.

In order to correctly interpret the results, the job_info dict, retrieved by ask, has to be given. Moreover, the result dict has to contain the keys fitness and cost. It is also possible to add the field info to the result in order to store additional, user-specific information.

PARAMETER DESCRIPTION
job_info

Job info returned by ask().

TYPE: dict

result

Result dictionary with mandatory keys fitness and cost.

TYPE: dict

Source code in src/dehb/optimizers/dehb.py
def tell(self, job_info: dict, result: dict, replay: bool=False):
    """Feed a result back to the optimizer.

    In order to correctly interpret the results, the `job_info` dict, retrieved by `ask`,
    has to be given. Moreover, the `result` dict has to contain the keys `fitness` and `cost`.
    It is also possible to add the field `info` to the `result` in order to store additional,
    user-specific information.

    Args:
        job_info (dict): Job info returned by ask().
        result (dict): Result dictionary with mandatory keys `fitness` and `cost`.
    """
    if replay:
        # Get job_info container from ask and update fields
        job_info_container = self.ask()
        # Update according to given history
        job_info_container["fidelity"] = job_info["fidelity"]
        job_info_container["config"] = job_info["config"]
        job_info_container["config_id"] = job_info["config_id"]

        # Update entry in ConfigRepository
        self.config_repository.configs[job_info["config_id"]].config = job_info["config"]
        # Replace job_info with container to make sure all fields are given
        job_info = job_info_container

    if self._tell_counter >= self._ask_counter:
        raise NotImplementedError("Called tell() more often than ask(). \
                                  Warmstarting with tell is not supported. ")
    self._tell_counter += 1
    # Update bracket information
    fitness, cost = result["fitness"], result["cost"]
    info = result["info"] if "info" in result else dict()
    fidelity, parent_id = job_info["fidelity"], job_info["parent_id"]
    config, config_id = job_info["config"], job_info["config_id"]
    bracket_id = job_info["bracket_id"]
    for bracket in self.active_brackets:
        if bracket.bracket_id == bracket_id:
            # bracket job complete
            bracket.complete_job(fidelity)  # IMPORTANT to perform synchronous SH

    self.config_repository.tell_result(config_id, fidelity, fitness, cost, info)

    # get hypercube representation from config repo
    if self.use_configspace:
        config = self.config_repository.get(config_id)

    # carry out DE selection
    if fitness <= self.de[fidelity].fitness[parent_id]:
        self.de[fidelity].population[parent_id] = config
        self.de[fidelity].population_ids[parent_id] = config_id
        self.de[fidelity].fitness[parent_id] = fitness
    # updating incumbents
    if self.de[fidelity].fitness[parent_id] < self.inc_score:
        self._update_incumbents(
            config=self.de[fidelity].population[parent_id],
            score=self.de[fidelity].fitness[parent_id],
            info=info,
        )
        if self.save_freq == "incumbent" and not replay:
            self.save()
    # book-keeping
    self._update_trackers(
        traj=self.inc_score, runtime=cost, history=(
            config_id, config.tolist(), float(fitness), float(cost), float(fidelity), info,
        ),
    )

    if self.save_freq == "step" and not replay:
        self.save()

run(fevals=None, brackets=None, total_cost=None, single_node_with_gpus=False, verbose=False, debug=False, **kwargs) #

Main interface to run optimization by DEHB.

This function waits on workers and if a worker is free, asks for a configuration and a fidelity to evaluate on and submits it to the worker. In each loop, it checks if a job is complete, fetches the results, carries the necessary processing of it asynchronously to the worker computations.

The duration of the DEHB run can be controlled by specifying one of 3 parameters. If more than one are specified, DEHB selects only one in the priority order (high to low): 1) Number of function evaluations (fevals) 2) Number of Successive Halving brackets run under Hyperband (brackets) 3) Total computational cost (in seconds) aggregated by all function evaluations (total_cost)

Source code in src/dehb/optimizers/dehb.py
@logger.catch
def run(self, fevals=None, brackets=None, total_cost=None, single_node_with_gpus=False,
        verbose=False, debug=False, **kwargs):
    """Main interface to run optimization by DEHB.

    This function waits on workers and if a worker is free, asks for a configuration and a
    fidelity to evaluate on and submits it to the worker. In each loop, it checks if a job
    is complete, fetches the results, carries the necessary processing of it asynchronously
    to the worker computations.

    The duration of the DEHB run can be controlled by specifying one of 3 parameters. If more
    than one are specified, DEHB selects only one in the priority order (high to low):
    1) Number of function evaluations (fevals)
    2) Number of Successive Halving brackets run under Hyperband (brackets)
    3) Total computational cost (in seconds) aggregated by all function evaluations (total_cost)
    """
    # check if run has already been called before
    if self.start is not None:
        logger.warning("DEHB has already been run. Calling 'run' twice could lead to unintended"
                       + " behavior. Please restart DEHB with an increased compute budget"
                       + " instead of calling 'run' twice.")

    # checks if a Dask client exists
    if len(kwargs) > 0 and self.n_workers > 1 and isinstance(self.client, Client):
        # broadcasts all additional data passed as **kwargs to all client workers
        # this reduces overload in the client-worker communication by not having to
        # serialize the redundant data used by all workers for every job
        self.shared_data = self.client.scatter(kwargs, broadcast=True)

    # allows each worker to be mapped to a different GPU when running on a single node
    # where all available GPUs are accessible
    self.single_node_with_gpus = single_node_with_gpus
    if self.single_node_with_gpus:
        self.distribute_gpus()

    self.start = self.start = time.time()
    fevals, brackets = self._adjust_budgets(fevals, brackets)
    if verbose:
        print("\nLogging at {} for optimization starting at {}\n".format(
            Path.cwd() / self.log_filename,
            time.strftime("%x %X %Z", time.localtime(self.start)),
        ))
    if debug:
        logger.configure(handlers=[{"sink": sys.stdout}])
    while True:
        if self._is_run_budget_exhausted(fevals, brackets, total_cost):
            break
        if self.is_worker_available():
            next_bracket_id = self._get_next_bracket(only_id=True)
            if brackets is not None and next_bracket_id >= brackets:
                # ignore submission and only collect results
                # when brackets are chosen as run budget, an extra bracket is created
                # since iteration_counter is incremented in ask() and then checked
                # in _is_run_budget_exhausted(), therefore, need to skip suggestions
                # coming from the extra allocated bracket
                # _is_run_budget_exhausted() will not return True until all the lower brackets
                # have finished computation and returned its results
                pass
            else:
                if self.n_workers > 1 or isinstance(self.client, Client):
                    self.logger.debug("{}/{} worker(s) available.".format(
                        self._get_worker_count() - len(self.futures), self._get_worker_count(),
                    ))
                # Ask for new job_info
                job_info = self.ask()
                # Submit job_info to a worker for execution
                self.submit_job(job_info, **kwargs)
                if verbose:
                    fidelity = job_info["fidelity"]
                    config_id = job_info["config_id"]
                    self._verbosity_runtime(fevals, brackets, total_cost)
                    self.logger.info(
                        "Evaluating configuration {} with fidelity {} under "
                        "bracket ID {}".format(config_id, fidelity, job_info["bracket_id"]),
                    )
                    self.logger.info(
                        f"Best score seen/Incumbent score: {self.inc_score}",
                    )
                self._verbosity_debug()
        self._fetch_results_from_workers()
        self.clean_inactive_brackets()
    # end of while

    if verbose and len(self.futures) > 0:
        self.logger.info(
            "DEHB optimisation over! Waiting to collect results from workers running..."
        )
    while len(self.futures) > 0:
        self._fetch_results_from_workers()
        time.sleep(0.05)  # waiting 50ms

    if verbose:
        time_taken = time.time() - self.start
        self.logger.info("End of optimisation! Total duration: {}; Total fevals: {}\n".format(
            time_taken, len(self.traj),
        ))
        self.logger.info(f"Incumbent score: {self.inc_score}")
        self.logger.info("Incumbent config: ")
        if self.use_configspace:
            config = self.vector_to_configspace(self.inc_config)
            for k, v in config.get_dictionary().items():
                self.logger.info(f"{k}: {v}")
        else:
            self.logger.info(f"{self.inc_config}")
    self.save()
    # reset waiting jobs of active bracket to allow for continuation
    self.active_brackets = []
    if len(self.active_brackets) > 0:
        for active_bracket in self.active_brackets:
            active_bracket.reset_waiting_jobs()
    return np.array(self.traj), np.array(self.runtime), np.array(self.history, dtype=object)