Skip to content

DEHB#

DEHBBase(cs=None, f=None, dimensions=None, mutation_factor=None, crossover_prob=None, strategy=None, min_fidelity=None, max_fidelity=None, eta=None, min_clip=None, max_clip=None, seed=None, boundary_fix_type='random', max_age=np.inf, resume=False, **kwargs) #

Source code in src/dehb/optimizers/dehb.py
def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=None,
             crossover_prob=None, strategy=None, min_fidelity=None,
             max_fidelity=None, eta=None, min_clip=None, max_clip=None, seed=None,
             boundary_fix_type="random", max_age=np.inf, resume=False, **kwargs):
    # Check for deprecated parameters
    if "max_budget" in kwargs or "min_budget" in kwargs:
        raise TypeError("Parameters min_budget and max_budget have been deprecated since " \
                        "v0.1.0. Please use the new parameters min_fidelity and max_fidelity " \
                        "or downgrade to a version prior to v0.1.0")
    if seed is None:
        seed = int(np.random.default_rng().integers(0, 2**32 - 1))
    elif isinstance(seed, np.random.Generator):
        seed = int(seed.integers(0, 2**32 - 1))

    assert isinstance(seed, int)
    self._original_seed = seed
    self.rng = np.random.default_rng(self._original_seed)

    # Miscellaneous
    self._setup_logger(resume, kwargs)
    self.config_repository = ConfigRepository()

    # Benchmark related variables
    self.cs = cs
    self.use_configspace = True if isinstance(self.cs, ConfigSpace.ConfigurationSpace) else False
    if self.use_configspace:
        self.cs.seed(self._original_seed)
        self.dimensions = len(self.cs.get_hyperparameters())
    elif dimensions is None or not isinstance(dimensions, (int, np.integer)):
        assert "Need to specify `dimensions` as an int when `cs` is not available/specified!"
    else:
        self.dimensions = dimensions
    self.f = f

    # DE related variables
    self.mutation_factor = mutation_factor
    self.crossover_prob = crossover_prob
    self.strategy = strategy
    self.fix_type = boundary_fix_type
    self.max_age = max_age
    self.de_params = {
        "mutation_factor": self.mutation_factor,
        "crossover_prob": self.crossover_prob,
        "strategy": self.strategy,
        "configspace": self.use_configspace,
        "boundary_fix_type": self.fix_type,
        "max_age": self.max_age,
        "cs": self.cs,
        "dimensions": self.dimensions,
        "f": f,
    }

    # Hyperband related variables
    self.min_fidelity = min_fidelity
    self.max_fidelity = max_fidelity
    if self.max_fidelity <= self.min_fidelity:
        self.logger.error("Only (Max Fidelity > Min Fidelity) is supported for DEHB.")
        if self.max_fidelity == self.min_fidelity:
            self.logger.error(
                "If you have a fixed fidelity, " \
                "you can instead run DE. For more information checkout: " \
                "https://automl.github.io/DEHB/references/de")
        raise AssertionError()
    self.eta = eta
    self.min_clip = min_clip
    self.max_clip = max_clip

    # Precomputing fidelity spacing and number of configurations for HB iterations
    self._pre_compute_fidelity_spacing()

    # Updating DE parameter list
    self.de_params.update({"output_path": self.output_path})

    # Global trackers
    self.population = None
    self.fitness = None
    self.inc_score = np.inf
    self.inc_config = None
    self.history = []

get_incumbents() #

Retrieve current incumbent configuration and score.

RETURNS DESCRIPTION
Tuple[Union[dict, Configuration], float]

Tuple containing incumbent configuration and score.

Source code in src/dehb/optimizers/dehb.py
def get_incumbents(self) -> Tuple[Union[dict, ConfigSpace.Configuration], float]:
    """Retrieve current incumbent configuration and score.

    Returns:
        Tuple containing incumbent configuration and score.
    """
    if self.use_configspace:
        return self.vector_to_configspace(self.inc_config), self.inc_score
    return self.inc_config, self.inc_score

DEHB(cs=None, f=None, dimensions=None, mutation_factor=0.5, crossover_prob=0.5, strategy='rand1_bin', min_fidelity=None, max_fidelity=None, eta=3, min_clip=None, max_clip=None, seed=None, configspace=True, boundary_fix_type='random', max_age=np.inf, n_workers=None, client=None, async_strategy='immediate', save_freq='incumbent', resume=False, **kwargs) #

Bases: DEHBBase

Source code in src/dehb/optimizers/dehb.py
def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=0.5,
             crossover_prob=0.5, strategy="rand1_bin", min_fidelity=None,
             max_fidelity=None, eta=3, min_clip=None, max_clip=None, seed=None,
             configspace=True, boundary_fix_type="random", max_age=np.inf, n_workers=None,
             client=None, async_strategy="immediate", save_freq="incumbent", resume=False,
             **kwargs):
    super().__init__(cs=cs, f=f, dimensions=dimensions, mutation_factor=mutation_factor,
                     crossover_prob=crossover_prob, strategy=strategy, min_fidelity=min_fidelity,
                     max_fidelity=max_fidelity, eta=eta, min_clip=min_clip, max_clip=max_clip, 
                     seed=seed, configspace=configspace, boundary_fix_type=boundary_fix_type,
                     max_age=max_age, resume=resume, **kwargs)
    self.de_params.update({"async_strategy": async_strategy})
    self.iteration_counter = -1
    self.de = {}
    self._max_pop_size = None
    self.active_brackets = []  # list of SHBracketManager objects
    self.traj = []
    self.runtime = []
    self.history = []
    self._ask_counter = 0
    self._tell_counter = 0
    self.start = None
    if save_freq not in ["incumbent", "step", "end"] and save_freq is not None:
        self.logger.warning(f"Save frequency {save_freq} unknown. Resorting to using 'end'.")
        save_freq = "end"
    self.save_freq = "end" if save_freq is None else save_freq

    # Dask variables
    if n_workers is None and client is None:
        raise ValueError("Need to specify either 'n_workers'(>0) or 'client' (a Dask client)!")
    if client is not None and isinstance(client, Client):
        self.client = client
        self.n_workers = len(client.ncores())
    else:
        self.n_workers = n_workers
        if self.n_workers > 1:
            self.client = Client(
                n_workers=self.n_workers, processes=True, threads_per_worker=1, scheduler_port=0
            )  # port 0 makes Dask select a random free port
        else:
            self.client = None
    self.futures = []
    self.shared_data = None

    # Initializing DE subpopulations
    self._get_pop_sizes()
    self._init_subpop()
    self.config_repository.initial_configs = self.config_repository.configs.copy()

    # Misc.
    self.available_gpus = None
    self.gpu_usage = None
    self.single_node_with_gpus = None

    self._time_budget_exhausted = False
    self._runtime_budget_timer = None

    # Setup logging and potentially reload state
    if resume:
        self.logger.info("Loading checkpoint...")
        success = self._load_checkpoint(self.output_path)
        if not success:
            self.logger.error("Checkpoint could not be loaded. " \
                              "Please refer to the prior warning in order to " \
                              "identifiy the problem.")
            raise AttributeError("Checkpoint could not be loaded. Check the logs" \
                                 "for more information")
    elif (self.output_path / "dehb_state.json").exists():
        self.logger.warning("A checkpoint already exists, " \
                            "results could potentially be overwritten.")

__getstate__() #

Allows the object to picklable while having Dask client as a class attribute.

Source code in src/dehb/optimizers/dehb.py
def __getstate__(self):
    """Allows the object to picklable while having Dask client as a class attribute."""
    d = dict(self.__dict__)
    d["client"] = None  # hack to allow Dask client to be a class attribute
    d["logger"] = None  # hack to allow logger object to be a class attribute
    d["_runtime_budget_timer"] = None # hack to allow timer object to be a class attribute
    return d

__del__() #

Ensures a clean kill of the Dask client and frees up a port.

Source code in src/dehb/optimizers/dehb.py
def __del__(self):
    """Ensures a clean kill of the Dask client and frees up a port."""
    if hasattr(self, "client") and isinstance(self, Client):
        self.client.close()

vector_to_configspace(config) #

Converts numpy representation to Configuration.

PARAMETER DESCRIPTION
config

Configuration to convert.

TYPE: array

RETURNS DESCRIPTION
Configuration

ConfigSpace.Configuration: Converted configuration

Source code in src/dehb/optimizers/dehb.py
def vector_to_configspace(self, config: np.array) -> ConfigSpace.Configuration:
    """Converts numpy representation to `Configuration`.

    Args:
        config (np.array): Configuration to convert.

    Returns:
        ConfigSpace.Configuration: Converted configuration
    """
    assert hasattr(self, "de")
    assert len(self.fidelities) > 0
    return self.de[self.fidelities[0]].vector_to_configspace(config)

configspace_to_vector(config) #

Converts Configuration to numpy array.

PARAMETER DESCRIPTION
config

Configuration to convert

TYPE: Configuration

RETURNS DESCRIPTION
array

np.array: Converted configuration

Source code in src/dehb/optimizers/dehb.py
def configspace_to_vector(self, config: ConfigSpace.Configuration) -> np.array:
    """Converts `Configuration` to numpy array.

    Args:
        config (ConfigSpace.Configuration): Configuration to convert

    Returns:
        np.array: Converted configuration
    """
    assert hasattr(self, "de")
    assert len(self.fidelities) > 0
    return self.de[self.fidelities[0]].configspace_to_vector(config)

ask(n_configs=1) #

Get the next configuration to run from the optimizer.

The retrieved configuration can then be evaluated by the user. After evaluation use tell to report the results back to the optimizer. For more information, please refer to the description of tell.

PARAMETER DESCRIPTION
n_configs

Number of configs to ask for. Defaults to 1.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
Union[dict, List[dict]]

dict or list of dict: Job info(s) of next configuration to evaluate.

Source code in src/dehb/optimizers/dehb.py
def ask(self, n_configs: int=1) -> Union[dict, List[dict]]:
    """Get the next configuration to run from the optimizer.

    The retrieved configuration can then be evaluated by the user.
    After evaluation use `tell` to report the results back to the optimizer.
    For more information, please refer to the description of `tell`.

    Args:
        n_configs (int, optional): Number of configs to ask for. Defaults to 1.

    Returns:
        dict or list of dict: Job info(s) of next configuration to evaluate.
    """
    jobs = []
    if n_configs == 1:
        jobs = self._get_next_job()
        self._ask_counter += 1
    else:
        for _ in range(n_configs):
            jobs.append(self._get_next_job())
            self._ask_counter += 1

    return jobs

save() #

Saves the current incumbent, history and state to disk.

Source code in src/dehb/optimizers/dehb.py
def save(self):
    """Saves the current incumbent, history and state to disk."""
    self.logger.info("Saving state to disk...")
    if self._time_budget_exhausted:
        self.logger.info("Runtime budget exhausted. Resorting to only saving overtime history.")
        self._save_history(name="overtime_history.parquet.gzip")
    else:
        self._save_incumbent()
        self._save_history()
        self._save_state()

tell(job_info, result, replay=False) #

Feed a result back to the optimizer.

In order to correctly interpret the results, the job_info dict, retrieved by ask, has to be given. Moreover, the result dict has to contain the keys fitness and cost. fitness resembles the objective you are trying to optimize, e.g. validation loss. cost resembles the computational cost for computing the result, e.g. the wallclock time for training and validating a neural network to achieve the validation loss specified in fitness. It is also possible to add the field info to the result in order to store additional, user-specific information.

User-specific information info

Please note, that we only support types, that are serializable by pandas. If non-serializable types are used, DEHB will not be able to save the history. If you want to be on the safe side, please use built-in python types.

PARAMETER DESCRIPTION
job_info

Job info returned by ask().

TYPE: dict

result

Result dictionary with mandatory keys fitness and cost.

TYPE: dict

Source code in src/dehb/optimizers/dehb.py
def tell(self, job_info: dict, result: dict, replay: bool=False) -> None:
    """Feed a result back to the optimizer.

    In order to correctly interpret the results, the `job_info` dict, retrieved by `ask`,
    has to be given. Moreover, the `result` dict has to contain the keys `fitness` and `cost`.
    `fitness` resembles the objective you are trying to optimize, e.g. validation loss.
    `cost` resembles the computational cost for computing the result, e.g. the wallclock time
    for training and validating a neural network to achieve the validation loss specified in
    `fitness`. It is also possible to add the field `info` to the `result` in order to store
    additional, user-specific information.

    !!! note "User-specific information `info`"

        Please note, that we only support types, that are serializable by `pandas`. If
        non-serializable types are used, DEHB will not be able to save the history.
        If you want to be on the safe side, please use built-in python types.

    Args:
        job_info (dict): Job info returned by ask().
        result (dict): Result dictionary with mandatory keys `fitness` and `cost`.
    """
    if replay:
        # Get job_info container from ask and update fields
        job_info_container = self.ask()
        # Update according to given history
        job_info_container["fidelity"] = job_info["fidelity"]
        job_info_container["config"] = job_info["config"]
        job_info_container["config_id"] = job_info["config_id"]

        # Update entry in ConfigRepository
        self.config_repository.configs[job_info["config_id"]].config = job_info["config"]
        # Replace job_info with container to make sure all fields are given
        job_info = job_info_container

    if self._tell_counter >= self._ask_counter:
        raise NotImplementedError("Called tell() more often than ask(). \
                                  Warmstarting with tell is not supported. ")
    self._tell_counter += 1
    # Update bracket information
    fitness, cost = float(result["fitness"]), float(result["cost"])
    info = result["info"] if "info" in result else {}
    fidelity, parent_id = job_info["fidelity"], job_info["parent_id"]
    config, config_id = job_info["config"], job_info["config_id"]
    bracket_id = job_info["bracket_id"]
    for bracket in self.active_brackets:
        if bracket.bracket_id == bracket_id:
            # bracket job complete
            bracket.complete_job(fidelity)  # IMPORTANT to perform synchronous SH

    self.config_repository.tell_result(config_id, fidelity, fitness, cost, info)

    # get hypercube representation from config repo
    if self.use_configspace:
        config = self.config_repository.get(config_id)

    # carry out DE selection
    if fitness <= self.de[fidelity].fitness[parent_id]:
        self.de[fidelity].population[parent_id] = config
        self.de[fidelity].population_ids[parent_id] = config_id
        self.de[fidelity].fitness[parent_id] = fitness
    # updating incumbents
    inc_changed = False
    if self.de[fidelity].fitness[parent_id] < self.inc_score:
        self._update_incumbents(
            config=self.de[fidelity].population[parent_id],
            score=self.de[fidelity].fitness[parent_id],
            info=info,
        )
        inc_changed = True
    # book-keeping
    self._update_trackers(
        traj=self.inc_score, runtime=cost, history=(
            config_id, config.tolist(), float(fitness), float(cost), float(fidelity), info,
        ),
    )

    if self.save_freq == "step" or (self.save_freq == "incumbent" and inc_changed) and not replay:
        self.save()

run(fevals=None, brackets=None, total_cost=None, single_node_with_gpus=False, **kwargs) #

Main interface to run optimization by DEHB.

This function waits on workers and if a worker is free, asks for a configuration and a fidelity to evaluate on and submits it to the worker. In each loop, it checks if a job is complete, fetches the results, carries the necessary processing of it asynchronously to the worker computations.

The duration of the DEHB run can be controlled by specifying one of 3 parameters. If more than one are specified, DEHB selects only one in the priority order (high to low):
1) Number of function evaluations (fevals)
2) Number of Successive Halving brackets run under Hyperband (brackets)
3) Total computational cost (in seconds) aggregated by all function evaluations (total_cost)

Using tell under the hood.

Please note, that run uses tell under the hood, therefore please have a look at the documentation of tell for more information e.g. about the result format.

Adjusting verbosity

The verbosity of DEHB logs can be adjusted via adding the log_level parameter to DEHBs initialization. As we use loguru, the logging levels can be found on their website.

PARAMETER DESCRIPTION
fevals

Number of functions evaluations to run. Defaults to None.

TYPE: int DEFAULT: None

brackets

Number of brackets to run. Defaults to None.

TYPE: int DEFAULT: None

total_cost

Wallclock budget in seconds. Defaults to None.

TYPE: int DEFAULT: None

single_node_with_gpus

Workers get assigned different GPUs. Default to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Tuple[array, array, array]

Trajectory, runtime and optimization history.

Source code in src/dehb/optimizers/dehb.py
@logger.catch
def run(self, fevals=None, brackets=None, total_cost=None, single_node_with_gpus=False,
        **kwargs) -> Tuple[np.array, np.array, np.array]:
    """Main interface to run optimization by DEHB.

    This function waits on workers and if a worker is free, asks for a configuration and a
    fidelity to evaluate on and submits it to the worker. In each loop, it checks if a job
    is complete, fetches the results, carries the necessary processing of it asynchronously
    to the worker computations.

    The duration of the DEHB run can be controlled by specifying one of 3 parameters. If more
    than one are specified, DEHB selects only one in the priority order (high to low): <br>
    1) Number of function evaluations (fevals) <br>
    2) Number of Successive Halving brackets run under Hyperband (brackets) <br>
    3) Total computational cost (in seconds) aggregated by all function evaluations (total_cost)

    !!! note "Using `tell` under the hood."

        Please note, that `run` uses `tell` under the hood, therefore please have a
        look at the documentation of `tell` for more information e.g. about the result format.

    !!! note "Adjusting verbosity"

        The verbosity of DEHB logs can be adjusted via adding the `log_level` parameter to DEHBs
        initialization. As we use loguru, the logging levels can be found on [their website](https://loguru.readthedocs.io/en/stable/api/logger.html#levels).

    Args:
        fevals (int, optional): Number of functions evaluations to run. Defaults to None.
        brackets (int, optional): Number of brackets to run. Defaults to None.
        total_cost (int, optional): Wallclock budget in seconds. Defaults to None.
        single_node_with_gpus (bool): Workers get assigned different GPUs. Default to False.

    Returns:
        Trajectory, runtime and optimization history.
    """
    # Warn if users use old state saving frequencies
    if "save_history" in kwargs or "save_intermediate" in kwargs or "name" in kwargs:
        logger.warning("The run parameters 'save_history', 'save_intermediate' and 'name' are "\
                       "deprecated, since the changes in v0.1.1. Please use the 'saving_freq' "\
                       "parameter in the constructor to adjust when to save DEHBs state " \
                       "(including history). Please use the 'output_path' parameter to adjust "\
                       "where the state and logs should be saved.")
        raise TypeError("Used deprecated parameters 'save_history', 'save_intermediate' " \
                        "and/or 'name'. Please check the logs for more information.")
    if "verbose" in kwargs:
        logger.warning("The run parameters 'verbose' is deprecated since the changes in v0.1.2. "\
                       "Please use the 'log_level' parameter when initializing DEHB.")
        raise TypeError("Used deprecated parameter 'verbose'. "\
                        "Please check the logs for more information.")
    # check if run has already been called before
    if self.start is not None:
        logger.warning("DEHB has already been run. Calling 'run' twice could lead to unintended"
                       + " behavior. Please restart DEHB with an increased compute budget"
                       + " instead of calling 'run' twice.")
        self._time_budget_exhausted = False

    # checks if a Dask client exists
    if len(kwargs) > 0 and self.n_workers > 1 and isinstance(self.client, Client):
        # broadcasts all additional data passed as **kwargs to all client workers
        # this reduces overload in the client-worker communication by not having to
        # serialize the redundant data used by all workers for every job
        self.shared_data = self.client.scatter(kwargs, broadcast=True)

    # allows each worker to be mapped to a different GPU when running on a single node
    # where all available GPUs are accessible
    self.single_node_with_gpus = single_node_with_gpus
    if self.single_node_with_gpus:
        self._distribute_gpus()

    self.start = self.start = time.time()
    self.logger.info("\nLogging at {} for optimization starting at {}\n".format(
        Path.cwd() / self.log_filename,
        time.strftime("%x %X %Z", time.localtime(self.start)),
    ))

    delimiters = [fevals, brackets, total_cost]
    delim_sum = sum(x is not None for x in delimiters)
    if delim_sum == 0:
        raise ValueError(
            "Need one of 'fevals', 'brackets' or 'total_cost' as budget for DEHB to run."
        )
    fevals, brackets = self._adjust_budgets(fevals, brackets)
    # Set alarm for specified runtime budget
    if total_cost is not None:
        self._runtime_budget_timer = Timer(total_cost, self._timeout_handler)
        self._runtime_budget_timer.start()
    while True:
        if self._is_run_budget_exhausted(fevals, brackets):
            break
        if self._is_worker_available():
            next_bracket_id = self._get_next_bracket(only_id=True)
            if brackets is not None and next_bracket_id >= brackets:
                # ignore submission and only collect results
                # when brackets are chosen as run budget, an extra bracket is created
                # since iteration_counter is incremented in ask() and then checked
                # in _is_run_budget_exhausted(), therefore, need to skip suggestions
                # coming from the extra allocated bracket
                # _is_run_budget_exhausted() will not return True until all the lower brackets
                # have finished computation and returned its results
                pass
            else:
                if self.n_workers > 1 or isinstance(self.client, Client):
                    self.logger.debug("{}/{} worker(s) available.".format(
                        self._get_worker_count() - len(self.futures), self._get_worker_count(),
                    ))
                # Ask for new job_info
                job_info = self.ask()
                # Submit job_info to a worker for execution
                self._submit_job(job_info, **kwargs)
                self._log_runtime(fevals, brackets, total_cost)
                self._log_job_submission(job_info)
                self._log_debug()
        self._fetch_results_from_workers()
        self._clean_inactive_brackets()
    # end of while
    time_taken = time.time() - self.start
    self.logger.info("End of optimisation! Total duration: {}; Total fevals: {}\n".format(
        time_taken, len(self.traj),
    ))
    self.logger.info(f"Incumbent score: {self.inc_score}")
    self.logger.info("Incumbent config: ")
    if self.use_configspace:
        config = self.vector_to_configspace(self.inc_config)
        for k, v in config.get_dictionary().items():
            self.logger.info(f"{k}: {v}")
    else:
        self.logger.info(f"{self.inc_config}")

    self.save()
    # cancel timer
    if self._runtime_budget_timer:
        self._runtime_budget_timer.cancel()
    # reset waiting jobs of active bracket to allow for continuation
    self.active_brackets = []
    if len(self.active_brackets) > 0:
        for active_bracket in self.active_brackets:
            active_bracket.reset_waiting_jobs()
    return np.array(self.traj), np.array(self.runtime), np.array(self.history, dtype=object)