Skip to content

Priorband

neps.optimizers.multi_fidelity_prior.priorband #

PriorBand #

PriorBand(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = EnsemblePolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",
    inc_sample_type: str = "mutation",
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",
    model_based: bool = False,
    modelling_type: str = "joint",
    initial_design_size: int = None,
    model_policy: Any = ModelPolicy,
    surrogate_model: str | Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: (
        str | AcquisitionSampler
    ) = "random",
)

Bases: MFBOBase, HyperbandCustomDefault, PriorBandBase

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = EnsemblePolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",  # could also be {"linear", "50-50"}
    inc_sample_type: str = "mutation",  # or {"crossover", "gaussian", "hypersphere"}
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",  # could also be {"decay", "constant"}
    # arguments for model
    model_based: bool = False,  # crucial argument to set to allow model-search
    modelling_type: str = "joint",  # could also be {"rung"}
    initial_design_size: int = None,
    model_policy: typing.Any = ModelPolicy,
    surrogate_model: str | typing.Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: str | AcquisitionSampler = "random",
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        initial_design_type=initial_design_type,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    self.prior_weight_type = prior_weight_type
    self.inc_sample_type = inc_sample_type
    self.inc_mutation_rate = inc_mutation_rate
    self.inc_mutation_std = inc_mutation_std
    self.sampling_policy = sampling_policy(
        pipeline_space=pipeline_space, inc_type=self.inc_sample_type
    )
    # determines the kind of trade-off between incumbent and prior weightage
    self.inc_style = inc_style  # used by PriorBandBase
    self.sampling_args = {
        "inc": None,
        "weights": {
            "prior": 1,  # begin with only prior sampling
            "inc": 0,
            "random": 0,
        },
    }

    bo_args = dict(
        surrogate_model=surrogate_model,
        domain_se_kernel=domain_se_kernel,
        hp_kernels=hp_kernels,
        surrogate_model_args=surrogate_model_args,
        acquisition=acquisition,
        log_prior_weighted=log_prior_weighted,
        acquisition_sampler=acquisition_sampler,
    )
    self.model_based = model_based
    self.modelling_type = modelling_type
    self.initial_design_size = initial_design_size
    # counting non-fidelity dimensions in search space
    ndims = sum(
        1
        for _, hp in self.pipeline_space.hyperparameters.items()
        if not hp.is_fidelity
    )
    n_min = ndims + 1
    self.init_size = n_min + 1  # in BOHB: init_design >= N_min + 2
    if self.modelling_type == "joint" and self.initial_design_size is not None:
        self.init_size = self.initial_design_size
    self.model_policy = model_policy(pipeline_space, **bo_args)

    for _, sh in self.sh_brackets.items():
        sh.sampling_policy = self.sampling_policy
        sh.sampling_args = self.sampling_args
        sh.model_policy = self.model_policy
        sh.sample_new_config = self.sample_new_config

calc_sampling_args #

calc_sampling_args(rung) -> dict

Sets the weights for each of the sampling techniques.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def calc_sampling_args(self, rung) -> dict:
    """Sets the weights for each of the sampling techniques."""
    if self.prior_weight_type == "geometric":
        _w_random = 1
        # scales weight of prior by eta raised to the current rung level
        # at the base rung thus w_prior = w_random
        # at the max rung r, w_prior = eta^r * w_random
        _w_prior = (self.eta**rung) * _w_random
    elif self.prior_weight_type == "linear":
        _w_random = 1
        w_prior_min_rung = 1 * _w_random
        w_prior_max_rung = self.eta * _w_random
        num_rungs = len(self.rung_map)
        # linearly increasing prior weight such that
        # at base rung, w_prior = w_random
        # at max rung, w_prior = self.eta * w_random
        _w_prior = np.linspace(
            start=w_prior_min_rung,
            stop=w_prior_max_rung,
            endpoint=True,
            num=num_rungs,
        )[rung]
    elif self.prior_weight_type == "50-50":
        _w_random = 1
        _w_prior = 1
    else:
        raise ValueError(f"{self.prior_weight_type} not in {{'linear', 'geometric'}}")

    # normalizing weights of random and prior sampling
    w_prior = _w_prior / (_w_prior + _w_random)
    w_random = _w_random / (_w_prior + _w_random)
    # calculating ratio of prior and incumbent weights
    _w_prior, _w_inc = self.prior_to_incumbent_ratio()
    # scaling back such that w_random + w_prior + w_inc = 1
    w_inc = _w_inc * w_prior
    w_prior = _w_prior * w_prior

    sampling_args = {
        "prior": w_prior,
        "inc": w_inc,
        "random": w_random,
    }
    return sampling_args

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a series of SH brackets, where the SH brackets comprising HB is repeated. This is done by iterating over the closed loop of possible SH brackets (self.sh_brackets). The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket, corresponding to the SH bracket under HB as registered by current_SH_bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a
    series of SH brackets, where the SH brackets comprising HB is repeated. This is
    done by iterating over the closed loop of possible SH brackets (self.sh_brackets).
    The oldest, active, incomplete SH bracket is searched for to choose the next
    evaluation. If either all brackets are over or waiting, a new SH bracket,
    corresponding to the SH bracket under HB as registered by `current_SH_bracket`.
    """
    n_sh_brackets = len(self.sh_brackets)
    # iterates over the different SH brackets
    self.current_sh_bracket = 0  # indexing from range(0, n_sh_brackets)
    start = 0
    _min_rung = self.sh_brackets[self.current_sh_bracket].min_rung
    end = self.sh_brackets[self.current_sh_bracket].config_map[_min_rung]

    if self.sample_default_first and self.sample_default_at_target:
        start += 1
        end += 1

    # stores the base rung size for each SH bracket in HB
    base_rung_sizes = []  # sorted(self.config_map.values(), reverse=True)
    for bracket in self.sh_brackets.values():
        base_rung_sizes.append(sorted(bracket.config_map.values(), reverse=True)[0])
    while end <= len(self.observed_configs):
        # subsetting only this SH bracket from the history
        sh_bracket = self.sh_brackets[self.current_sh_bracket]
        sh_bracket.clean_rung_information()
        # for the SH bracket in start-end, calculate total SH budget used, from the
        # correct SH bracket object to make the right budget calculations

        bracket_budget_used = sh_bracket._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than the total SH budget then still an active bracket
        current_bracket_full_budget = sum(sh_bracket.full_rung_trace)
        if bracket_budget_used < current_bracket_full_budget:
            # updating rung information of the current bracket

            sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, signals to starts a new SH bracket
            sh_bracket._handle_promotions()
            promotion_count = 0
            for _, promotions in sh_bracket.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found which is the
                # current SH bracket at this scope
                return
            # if no promotions, ensure an empty state explicitly to disable bracket
            sh_bracket.clean_rung_information()
        start = end
        # updating pointer to the next SH bracket in HB
        self.current_sh_bracket = (self.current_sh_bracket + 1) % n_sh_brackets
        end = start + base_rung_sizes[self.current_sh_bracket]
    # reaches here if all old brackets are either waiting or finished

    # updates rung info with the latest active, incomplete bracket
    sh_bracket = self.sh_brackets[self.current_sh_bracket]

    sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
    sh_bracket._handle_promotions()

find_1nn_distance_from_incumbent #

find_1nn_distance_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_1nn_distance_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    distances = self.find_all_distances_from_incumbent(incumbent)
    distance = min(distances)
    return distance

find_all_distances_from_incumbent #

find_all_distances_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_all_distances_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    dist = lambda x: compute_config_dist(incumbent, x)
    # computing distance of incumbent from all seen points in history
    distances = [dist(config) for config in self.observed_configs.config]
    # ensuring the distances exclude 0 or the distance from itself
    distances = [d for d in distances if d > 0]
    return distances

find_incumbent #

find_incumbent(rung: int = None) -> SearchSpace

Find the best performing configuration seen so far.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_incumbent(self, rung: int = None) -> SearchSpace:
    """Find the best performing configuration seen so far."""
    rungs = self.observed_configs.rung.values
    idxs = self.observed_configs.index.values
    while rung is not None:
        # enters this scope is `rung` argument passed and not left empty or None
        if rung not in rungs:
            self.logger.warn(f"{rung} not in {np.unique(idxs)}")
        # filtering by rung based on argument passed
        idxs = self.observed_configs.rung.values == rung
        # checking width of current rung
        if len(idxs) < self.eta:
            self.logger.warn(
                f"Selecting incumbent from a rung with width less than {self.eta}"
            )
    # extracting the incumbent configuration
    if len(idxs):
        # finding the config with the lowest recorded performance
        _perfs = self.observed_configs.loc[idxs].perf.values
        inc_idx = np.nanargmin([np.nan if t is None else t for t in _perfs])
        inc = self.observed_configs.loc[idxs].iloc[inc_idx].config
    else:
        # THIS block should not ever execute, but for runtime anomalies, if no
        # incumbent can be extracted, the prior is treated as the incumbent
        inc = self.pipeline_space.sample_default_configuration()
        self.logger.warn(
            "Treating the prior as the incumbent. "
            "Please check if this should not happen."
        )
    return inc

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    self.set_sampling_weights_and_inc(rung=self.current_sh_bracket)

    for _, sh in self.sh_brackets.items():
        sh.sampling_args = self.sampling_args
    return super().get_config_and_ids()

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_activate_inc #

is_activate_inc() -> bool

Function to check optimization state to allow/disallow incumbent sampling.

This function checks if the total resources used for the finished evaluations sums to the budget of one full SH bracket.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def is_activate_inc(self) -> bool:
    """Function to check optimization state to allow/disallow incumbent sampling.

    This function checks if the total resources used for the finished evaluations
    sums to the budget of one full SH bracket.
    """
    activate_inc = False

    # calculate total resource cost required for the first SH bracket in HB
    if hasattr(self, "sh_brackets") and len(self.sh_brackets) > 1:
        # for HB or AsyncHB which invokes multiple SH brackets
        bracket = self.sh_brackets[self.min_rung]
    else:
        # for SH or ASHA which do not invoke multiple SH brackets
        bracket = self
    # calculating the total resources spent in the first SH bracket, taking into
    # account the continuations, that is, the resources spent on a promoted config is
    # not fidelity[rung] but (fidelity[rung] - fidelity[rung - 1])
    continuation_resources = bracket.rung_map[bracket.min_rung]
    resources = bracket.config_map[bracket.min_rung] * continuation_resources
    for r in range(1, len(bracket.rung_map)):
        rung = sorted(list(bracket.rung_map.keys()), reverse=False)[r]
        continuation_resources = bracket.rung_map[rung] - bracket.rung_map[rung - 1]
        resources += bracket.config_map[rung] * continuation_resources

    # find resources spent so far for all finished evaluations
    resources_used = calc_total_resources_spent(self.observed_configs, self.rung_map)

    if resources_used >= resources and len(
        self.rung_histories[self.max_rung]["config"]
    ):
        # activate incumbent-based sampling if a total resources is at least
        # equivalent to one SH bracket resource usage, and additionally, for the
        # asynchronous case with large number of workers, the check enforces that
        # at least one configuration has been evaluated at the highest fidelity
        activate_inc = True
    return activate_inc

is_init_phase #

is_init_phase() -> bool

Returns True is in the warmstart phase and False under model-based search.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def is_init_phase(self) -> bool:
    """Returns True is in the warmstart phase and False under model-based search."""
    if self.modelling_type == "rung":
        # build a model per rung or per fidelity
        # in this case, the initial design checks if `init_size` number of
        # configurations have finished at a rung or not and the highest such rung is
        # chosen for model building at teh current iteration
        if self._active_rung() is None:
            return True
    elif self.modelling_type == "joint":
        # builds a model across all fidelities with the fidelity as a dimension
        # in this case, calculate the total number of function evaluations spent
        # and in vanilla BO fashion use that to compare with the initital design size
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        resources /= self.max_budget
        if resources < self.init_size:
            return True
    else:
        raise ValueError("Choice of modelling_type not in {{'rung', 'joint'}}")
    return False

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

prior_to_incumbent_ratio #

prior_to_incumbent_ratio() -> float | float

Calculates the normalized weight distribution between prior and incumbent.

Sum of the weights should be 1.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def prior_to_incumbent_ratio(self) -> float | float:
    """Calculates the normalized weight distribution between prior and incumbent.

    Sum of the weights should be 1.
    """
    if self.inc_style == "constant":
        return self._prior_to_incumbent_ratio_constant()
    elif self.inc_style == "decay":
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        return self._prior_to_incumbent_ratio_decay(
            resources, self.eta, self.min_budget, self.max_budget
        )
    elif self.inc_style == "dynamic":
        return self._prior_to_incumbent_ratio_dynamic(self.max_rung)
    else:
        raise ValueError(f"Invalid option {self.inc_style}")

sample_new_config #

sample_new_config(rung: int = None, **kwargs)

Samples configuration from policies or random.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def sample_new_config(
    self,
    rung: int = None,
    **kwargs,
):
    """Samples configuration from policies or random."""
    if self.model_based and not self.is_init_phase():
        incumbent = None
        if self.modelling_type == "rung":
            # `rung` should not be None when not in init phase
            active_max_rung = self._active_rung()
            fidelity = None
            active_max_fidelity = self.rung_map[active_max_rung]
        elif self.modelling_type == "joint":
            fidelity = self.rung_map[rung]
            active_max_fidelity = None
            # IMPORTANT step for correct 2-step acquisition
            incumbent = min(self.rung_histories[rung]["perf"])
        else:
            fidelity = active_max_fidelity = None
        assert (
            (fidelity is None and active_max_fidelity is not None)
            or (active_max_fidelity is None and fidelity is not None)
            or (active_max_fidelity is not None and fidelity is not None)
        ), "Either condition needs to be not None!"
        config = self.model_policy.sample(
            active_max_fidelity=active_max_fidelity,
            fidelity=fidelity,
            incumbent=incumbent,
            **self.sampling_args,
        )
    elif self.sampling_policy is not None:
        config = self.sampling_policy.sample(**self.sampling_args)
    else:
        config = self.pipeline_space.sample(
            patience=self.patience,
            user_priors=self.use_priors,
            ignore_fidelity=True,
        )
    return config

PriorBandBase #

Class that defines essential properties needed by PriorBand.

Designed to work with the topmost parent class as SuccessiveHalvingBase.

calc_sampling_args #

calc_sampling_args(rung) -> dict

Sets the weights for each of the sampling techniques.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def calc_sampling_args(self, rung) -> dict:
    """Sets the weights for each of the sampling techniques."""
    if self.prior_weight_type == "geometric":
        _w_random = 1
        # scales weight of prior by eta raised to the current rung level
        # at the base rung thus w_prior = w_random
        # at the max rung r, w_prior = eta^r * w_random
        _w_prior = (self.eta**rung) * _w_random
    elif self.prior_weight_type == "linear":
        _w_random = 1
        w_prior_min_rung = 1 * _w_random
        w_prior_max_rung = self.eta * _w_random
        num_rungs = len(self.rung_map)
        # linearly increasing prior weight such that
        # at base rung, w_prior = w_random
        # at max rung, w_prior = self.eta * w_random
        _w_prior = np.linspace(
            start=w_prior_min_rung,
            stop=w_prior_max_rung,
            endpoint=True,
            num=num_rungs,
        )[rung]
    elif self.prior_weight_type == "50-50":
        _w_random = 1
        _w_prior = 1
    else:
        raise ValueError(f"{self.prior_weight_type} not in {{'linear', 'geometric'}}")

    # normalizing weights of random and prior sampling
    w_prior = _w_prior / (_w_prior + _w_random)
    w_random = _w_random / (_w_prior + _w_random)
    # calculating ratio of prior and incumbent weights
    _w_prior, _w_inc = self.prior_to_incumbent_ratio()
    # scaling back such that w_random + w_prior + w_inc = 1
    w_inc = _w_inc * w_prior
    w_prior = _w_prior * w_prior

    sampling_args = {
        "prior": w_prior,
        "inc": w_inc,
        "random": w_random,
    }
    return sampling_args

find_1nn_distance_from_incumbent #

find_1nn_distance_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_1nn_distance_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    distances = self.find_all_distances_from_incumbent(incumbent)
    distance = min(distances)
    return distance

find_all_distances_from_incumbent #

find_all_distances_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_all_distances_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    dist = lambda x: compute_config_dist(incumbent, x)
    # computing distance of incumbent from all seen points in history
    distances = [dist(config) for config in self.observed_configs.config]
    # ensuring the distances exclude 0 or the distance from itself
    distances = [d for d in distances if d > 0]
    return distances

find_incumbent #

find_incumbent(rung: int = None) -> SearchSpace

Find the best performing configuration seen so far.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_incumbent(self, rung: int = None) -> SearchSpace:
    """Find the best performing configuration seen so far."""
    rungs = self.observed_configs.rung.values
    idxs = self.observed_configs.index.values
    while rung is not None:
        # enters this scope is `rung` argument passed and not left empty or None
        if rung not in rungs:
            self.logger.warn(f"{rung} not in {np.unique(idxs)}")
        # filtering by rung based on argument passed
        idxs = self.observed_configs.rung.values == rung
        # checking width of current rung
        if len(idxs) < self.eta:
            self.logger.warn(
                f"Selecting incumbent from a rung with width less than {self.eta}"
            )
    # extracting the incumbent configuration
    if len(idxs):
        # finding the config with the lowest recorded performance
        _perfs = self.observed_configs.loc[idxs].perf.values
        inc_idx = np.nanargmin([np.nan if t is None else t for t in _perfs])
        inc = self.observed_configs.loc[idxs].iloc[inc_idx].config
    else:
        # THIS block should not ever execute, but for runtime anomalies, if no
        # incumbent can be extracted, the prior is treated as the incumbent
        inc = self.pipeline_space.sample_default_configuration()
        self.logger.warn(
            "Treating the prior as the incumbent. "
            "Please check if this should not happen."
        )
    return inc

is_activate_inc #

is_activate_inc() -> bool

Function to check optimization state to allow/disallow incumbent sampling.

This function checks if the total resources used for the finished evaluations sums to the budget of one full SH bracket.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def is_activate_inc(self) -> bool:
    """Function to check optimization state to allow/disallow incumbent sampling.

    This function checks if the total resources used for the finished evaluations
    sums to the budget of one full SH bracket.
    """
    activate_inc = False

    # calculate total resource cost required for the first SH bracket in HB
    if hasattr(self, "sh_brackets") and len(self.sh_brackets) > 1:
        # for HB or AsyncHB which invokes multiple SH brackets
        bracket = self.sh_brackets[self.min_rung]
    else:
        # for SH or ASHA which do not invoke multiple SH brackets
        bracket = self
    # calculating the total resources spent in the first SH bracket, taking into
    # account the continuations, that is, the resources spent on a promoted config is
    # not fidelity[rung] but (fidelity[rung] - fidelity[rung - 1])
    continuation_resources = bracket.rung_map[bracket.min_rung]
    resources = bracket.config_map[bracket.min_rung] * continuation_resources
    for r in range(1, len(bracket.rung_map)):
        rung = sorted(list(bracket.rung_map.keys()), reverse=False)[r]
        continuation_resources = bracket.rung_map[rung] - bracket.rung_map[rung - 1]
        resources += bracket.config_map[rung] * continuation_resources

    # find resources spent so far for all finished evaluations
    resources_used = calc_total_resources_spent(self.observed_configs, self.rung_map)

    if resources_used >= resources and len(
        self.rung_histories[self.max_rung]["config"]
    ):
        # activate incumbent-based sampling if a total resources is at least
        # equivalent to one SH bracket resource usage, and additionally, for the
        # asynchronous case with large number of workers, the check enforces that
        # at least one configuration has been evaluated at the highest fidelity
        activate_inc = True
    return activate_inc

prior_to_incumbent_ratio #

prior_to_incumbent_ratio() -> float | float

Calculates the normalized weight distribution between prior and incumbent.

Sum of the weights should be 1.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def prior_to_incumbent_ratio(self) -> float | float:
    """Calculates the normalized weight distribution between prior and incumbent.

    Sum of the weights should be 1.
    """
    if self.inc_style == "constant":
        return self._prior_to_incumbent_ratio_constant()
    elif self.inc_style == "decay":
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        return self._prior_to_incumbent_ratio_decay(
            resources, self.eta, self.min_budget, self.max_budget
        )
    elif self.inc_style == "dynamic":
        return self._prior_to_incumbent_ratio_dynamic(self.max_rung)
    else:
        raise ValueError(f"Invalid option {self.inc_style}")

PriorBandNoIncToPrior #

PriorBandNoIncToPrior(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = EnsemblePolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",
    inc_sample_type: str = "mutation",
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",
    model_based: bool = False,
    modelling_type: str = "joint",
    initial_design_size: int = None,
    model_policy: Any = ModelPolicy,
    surrogate_model: str | Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: (
        str | AcquisitionSampler
    ) = "random",
)

Bases: PriorBand

Disables incumbent sampling to replace with prior-based sampling.

This is equivalent to running HyperBand with Prior and Random sampling, where their relationship is controlled by the prior_weight_type argument.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = EnsemblePolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",  # could also be {"linear", "50-50"}
    inc_sample_type: str = "mutation",  # or {"crossover", "gaussian", "hypersphere"}
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",  # could also be {"decay", "constant"}
    # arguments for model
    model_based: bool = False,  # crucial argument to set to allow model-search
    modelling_type: str = "joint",  # could also be {"rung"}
    initial_design_size: int = None,
    model_policy: typing.Any = ModelPolicy,
    surrogate_model: str | typing.Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: str | AcquisitionSampler = "random",
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        initial_design_type=initial_design_type,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    self.prior_weight_type = prior_weight_type
    self.inc_sample_type = inc_sample_type
    self.inc_mutation_rate = inc_mutation_rate
    self.inc_mutation_std = inc_mutation_std
    self.sampling_policy = sampling_policy(
        pipeline_space=pipeline_space, inc_type=self.inc_sample_type
    )
    # determines the kind of trade-off between incumbent and prior weightage
    self.inc_style = inc_style  # used by PriorBandBase
    self.sampling_args = {
        "inc": None,
        "weights": {
            "prior": 1,  # begin with only prior sampling
            "inc": 0,
            "random": 0,
        },
    }

    bo_args = dict(
        surrogate_model=surrogate_model,
        domain_se_kernel=domain_se_kernel,
        hp_kernels=hp_kernels,
        surrogate_model_args=surrogate_model_args,
        acquisition=acquisition,
        log_prior_weighted=log_prior_weighted,
        acquisition_sampler=acquisition_sampler,
    )
    self.model_based = model_based
    self.modelling_type = modelling_type
    self.initial_design_size = initial_design_size
    # counting non-fidelity dimensions in search space
    ndims = sum(
        1
        for _, hp in self.pipeline_space.hyperparameters.items()
        if not hp.is_fidelity
    )
    n_min = ndims + 1
    self.init_size = n_min + 1  # in BOHB: init_design >= N_min + 2
    if self.modelling_type == "joint" and self.initial_design_size is not None:
        self.init_size = self.initial_design_size
    self.model_policy = model_policy(pipeline_space, **bo_args)

    for _, sh in self.sh_brackets.items():
        sh.sampling_policy = self.sampling_policy
        sh.sampling_args = self.sampling_args
        sh.model_policy = self.model_policy
        sh.sample_new_config = self.sample_new_config

calc_sampling_args #

calc_sampling_args(rung) -> dict

Sets the weights for each of the sampling techniques.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def calc_sampling_args(self, rung) -> dict:
    """Sets the weights for each of the sampling techniques."""
    if self.prior_weight_type == "geometric":
        _w_random = 1
        # scales weight of prior by eta raised to the current rung level
        # at the base rung thus w_prior = w_random
        # at the max rung r, w_prior = eta^r * w_random
        _w_prior = (self.eta**rung) * _w_random
    elif self.prior_weight_type == "linear":
        _w_random = 1
        w_prior_min_rung = 1 * _w_random
        w_prior_max_rung = self.eta * _w_random
        num_rungs = len(self.rung_map)
        # linearly increasing prior weight such that
        # at base rung, w_prior = w_random
        # at max rung, w_prior = self.eta * w_random
        _w_prior = np.linspace(
            start=w_prior_min_rung,
            stop=w_prior_max_rung,
            endpoint=True,
            num=num_rungs,
        )[rung]
    elif self.prior_weight_type == "50-50":
        _w_random = 1
        _w_prior = 1
    else:
        raise ValueError(f"{self.prior_weight_type} not in {{'linear', 'geometric'}}")

    # normalizing weights of random and prior sampling
    w_prior = _w_prior / (_w_prior + _w_random)
    w_random = _w_random / (_w_prior + _w_random)
    # calculating ratio of prior and incumbent weights
    _w_prior, _w_inc = self.prior_to_incumbent_ratio()
    # scaling back such that w_random + w_prior + w_inc = 1
    w_inc = _w_inc * w_prior
    w_prior = _w_prior * w_prior

    sampling_args = {
        "prior": w_prior,
        "inc": w_inc,
        "random": w_random,
    }
    return sampling_args

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a series of SH brackets, where the SH brackets comprising HB is repeated. This is done by iterating over the closed loop of possible SH brackets (self.sh_brackets). The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket, corresponding to the SH bracket under HB as registered by current_SH_bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a
    series of SH brackets, where the SH brackets comprising HB is repeated. This is
    done by iterating over the closed loop of possible SH brackets (self.sh_brackets).
    The oldest, active, incomplete SH bracket is searched for to choose the next
    evaluation. If either all brackets are over or waiting, a new SH bracket,
    corresponding to the SH bracket under HB as registered by `current_SH_bracket`.
    """
    n_sh_brackets = len(self.sh_brackets)
    # iterates over the different SH brackets
    self.current_sh_bracket = 0  # indexing from range(0, n_sh_brackets)
    start = 0
    _min_rung = self.sh_brackets[self.current_sh_bracket].min_rung
    end = self.sh_brackets[self.current_sh_bracket].config_map[_min_rung]

    if self.sample_default_first and self.sample_default_at_target:
        start += 1
        end += 1

    # stores the base rung size for each SH bracket in HB
    base_rung_sizes = []  # sorted(self.config_map.values(), reverse=True)
    for bracket in self.sh_brackets.values():
        base_rung_sizes.append(sorted(bracket.config_map.values(), reverse=True)[0])
    while end <= len(self.observed_configs):
        # subsetting only this SH bracket from the history
        sh_bracket = self.sh_brackets[self.current_sh_bracket]
        sh_bracket.clean_rung_information()
        # for the SH bracket in start-end, calculate total SH budget used, from the
        # correct SH bracket object to make the right budget calculations

        bracket_budget_used = sh_bracket._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than the total SH budget then still an active bracket
        current_bracket_full_budget = sum(sh_bracket.full_rung_trace)
        if bracket_budget_used < current_bracket_full_budget:
            # updating rung information of the current bracket

            sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, signals to starts a new SH bracket
            sh_bracket._handle_promotions()
            promotion_count = 0
            for _, promotions in sh_bracket.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found which is the
                # current SH bracket at this scope
                return
            # if no promotions, ensure an empty state explicitly to disable bracket
            sh_bracket.clean_rung_information()
        start = end
        # updating pointer to the next SH bracket in HB
        self.current_sh_bracket = (self.current_sh_bracket + 1) % n_sh_brackets
        end = start + base_rung_sizes[self.current_sh_bracket]
    # reaches here if all old brackets are either waiting or finished

    # updates rung info with the latest active, incomplete bracket
    sh_bracket = self.sh_brackets[self.current_sh_bracket]

    sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
    sh_bracket._handle_promotions()

find_1nn_distance_from_incumbent #

find_1nn_distance_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_1nn_distance_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    distances = self.find_all_distances_from_incumbent(incumbent)
    distance = min(distances)
    return distance

find_all_distances_from_incumbent #

find_all_distances_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_all_distances_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    dist = lambda x: compute_config_dist(incumbent, x)
    # computing distance of incumbent from all seen points in history
    distances = [dist(config) for config in self.observed_configs.config]
    # ensuring the distances exclude 0 or the distance from itself
    distances = [d for d in distances if d > 0]
    return distances

find_incumbent #

find_incumbent(rung: int = None) -> SearchSpace

Find the best performing configuration seen so far.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_incumbent(self, rung: int = None) -> SearchSpace:
    """Find the best performing configuration seen so far."""
    rungs = self.observed_configs.rung.values
    idxs = self.observed_configs.index.values
    while rung is not None:
        # enters this scope is `rung` argument passed and not left empty or None
        if rung not in rungs:
            self.logger.warn(f"{rung} not in {np.unique(idxs)}")
        # filtering by rung based on argument passed
        idxs = self.observed_configs.rung.values == rung
        # checking width of current rung
        if len(idxs) < self.eta:
            self.logger.warn(
                f"Selecting incumbent from a rung with width less than {self.eta}"
            )
    # extracting the incumbent configuration
    if len(idxs):
        # finding the config with the lowest recorded performance
        _perfs = self.observed_configs.loc[idxs].perf.values
        inc_idx = np.nanargmin([np.nan if t is None else t for t in _perfs])
        inc = self.observed_configs.loc[idxs].iloc[inc_idx].config
    else:
        # THIS block should not ever execute, but for runtime anomalies, if no
        # incumbent can be extracted, the prior is treated as the incumbent
        inc = self.pipeline_space.sample_default_configuration()
        self.logger.warn(
            "Treating the prior as the incumbent. "
            "Please check if this should not happen."
        )
    return inc

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    self.set_sampling_weights_and_inc(rung=self.current_sh_bracket)

    for _, sh in self.sh_brackets.items():
        sh.sampling_args = self.sampling_args
    return super().get_config_and_ids()

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_activate_inc #

is_activate_inc() -> bool

Function to check optimization state to allow/disallow incumbent sampling.

This function checks if the total resources used for the finished evaluations sums to the budget of one full SH bracket.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def is_activate_inc(self) -> bool:
    """Function to check optimization state to allow/disallow incumbent sampling.

    This function checks if the total resources used for the finished evaluations
    sums to the budget of one full SH bracket.
    """
    activate_inc = False

    # calculate total resource cost required for the first SH bracket in HB
    if hasattr(self, "sh_brackets") and len(self.sh_brackets) > 1:
        # for HB or AsyncHB which invokes multiple SH brackets
        bracket = self.sh_brackets[self.min_rung]
    else:
        # for SH or ASHA which do not invoke multiple SH brackets
        bracket = self
    # calculating the total resources spent in the first SH bracket, taking into
    # account the continuations, that is, the resources spent on a promoted config is
    # not fidelity[rung] but (fidelity[rung] - fidelity[rung - 1])
    continuation_resources = bracket.rung_map[bracket.min_rung]
    resources = bracket.config_map[bracket.min_rung] * continuation_resources
    for r in range(1, len(bracket.rung_map)):
        rung = sorted(list(bracket.rung_map.keys()), reverse=False)[r]
        continuation_resources = bracket.rung_map[rung] - bracket.rung_map[rung - 1]
        resources += bracket.config_map[rung] * continuation_resources

    # find resources spent so far for all finished evaluations
    resources_used = calc_total_resources_spent(self.observed_configs, self.rung_map)

    if resources_used >= resources and len(
        self.rung_histories[self.max_rung]["config"]
    ):
        # activate incumbent-based sampling if a total resources is at least
        # equivalent to one SH bracket resource usage, and additionally, for the
        # asynchronous case with large number of workers, the check enforces that
        # at least one configuration has been evaluated at the highest fidelity
        activate_inc = True
    return activate_inc

is_init_phase #

is_init_phase() -> bool

Returns True is in the warmstart phase and False under model-based search.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def is_init_phase(self) -> bool:
    """Returns True is in the warmstart phase and False under model-based search."""
    if self.modelling_type == "rung":
        # build a model per rung or per fidelity
        # in this case, the initial design checks if `init_size` number of
        # configurations have finished at a rung or not and the highest such rung is
        # chosen for model building at teh current iteration
        if self._active_rung() is None:
            return True
    elif self.modelling_type == "joint":
        # builds a model across all fidelities with the fidelity as a dimension
        # in this case, calculate the total number of function evaluations spent
        # and in vanilla BO fashion use that to compare with the initital design size
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        resources /= self.max_budget
        if resources < self.init_size:
            return True
    else:
        raise ValueError("Choice of modelling_type not in {{'rung', 'joint'}}")
    return False

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

prior_to_incumbent_ratio #

prior_to_incumbent_ratio() -> float | float

Calculates the normalized weight distribution between prior and incumbent.

Sum of the weights should be 1.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def prior_to_incumbent_ratio(self) -> float | float:
    """Calculates the normalized weight distribution between prior and incumbent.

    Sum of the weights should be 1.
    """
    if self.inc_style == "constant":
        return self._prior_to_incumbent_ratio_constant()
    elif self.inc_style == "decay":
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        return self._prior_to_incumbent_ratio_decay(
            resources, self.eta, self.min_budget, self.max_budget
        )
    elif self.inc_style == "dynamic":
        return self._prior_to_incumbent_ratio_dynamic(self.max_rung)
    else:
        raise ValueError(f"Invalid option {self.inc_style}")

sample_new_config #

sample_new_config(rung: int = None, **kwargs)

Samples configuration from policies or random.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def sample_new_config(
    self,
    rung: int = None,
    **kwargs,
):
    """Samples configuration from policies or random."""
    if self.model_based and not self.is_init_phase():
        incumbent = None
        if self.modelling_type == "rung":
            # `rung` should not be None when not in init phase
            active_max_rung = self._active_rung()
            fidelity = None
            active_max_fidelity = self.rung_map[active_max_rung]
        elif self.modelling_type == "joint":
            fidelity = self.rung_map[rung]
            active_max_fidelity = None
            # IMPORTANT step for correct 2-step acquisition
            incumbent = min(self.rung_histories[rung]["perf"])
        else:
            fidelity = active_max_fidelity = None
        assert (
            (fidelity is None and active_max_fidelity is not None)
            or (active_max_fidelity is None and fidelity is not None)
            or (active_max_fidelity is not None and fidelity is not None)
        ), "Either condition needs to be not None!"
        config = self.model_policy.sample(
            active_max_fidelity=active_max_fidelity,
            fidelity=fidelity,
            incumbent=incumbent,
            **self.sampling_args,
        )
    elif self.sampling_policy is not None:
        config = self.sampling_policy.sample(**self.sampling_args)
    else:
        config = self.pipeline_space.sample(
            patience=self.patience,
            user_priors=self.use_priors,
            ignore_fidelity=True,
        )
    return config

PriorBandNoPriorToInc #

PriorBandNoPriorToInc(**kwargs)

Bases: PriorBand

Disables prior based sampling to replace with incumbent-based sampling.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    # cannot use prior in this version
    self.pipeline_space.has_prior = False

calc_sampling_args #

calc_sampling_args(rung) -> dict

Sets the weights for each of the sampling techniques.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def calc_sampling_args(self, rung) -> dict:
    """Sets the weights for each of the sampling techniques."""
    if self.prior_weight_type == "geometric":
        _w_random = 1
        # scales weight of prior by eta raised to the current rung level
        # at the base rung thus w_prior = w_random
        # at the max rung r, w_prior = eta^r * w_random
        _w_prior = (self.eta**rung) * _w_random
    elif self.prior_weight_type == "linear":
        _w_random = 1
        w_prior_min_rung = 1 * _w_random
        w_prior_max_rung = self.eta * _w_random
        num_rungs = len(self.rung_map)
        # linearly increasing prior weight such that
        # at base rung, w_prior = w_random
        # at max rung, w_prior = self.eta * w_random
        _w_prior = np.linspace(
            start=w_prior_min_rung,
            stop=w_prior_max_rung,
            endpoint=True,
            num=num_rungs,
        )[rung]
    elif self.prior_weight_type == "50-50":
        _w_random = 1
        _w_prior = 1
    else:
        raise ValueError(f"{self.prior_weight_type} not in {{'linear', 'geometric'}}")

    # normalizing weights of random and prior sampling
    w_prior = _w_prior / (_w_prior + _w_random)
    w_random = _w_random / (_w_prior + _w_random)
    # calculating ratio of prior and incumbent weights
    _w_prior, _w_inc = self.prior_to_incumbent_ratio()
    # scaling back such that w_random + w_prior + w_inc = 1
    w_inc = _w_inc * w_prior
    w_prior = _w_prior * w_prior

    sampling_args = {
        "prior": w_prior,
        "inc": w_inc,
        "random": w_random,
    }
    return sampling_args

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a series of SH brackets, where the SH brackets comprising HB is repeated. This is done by iterating over the closed loop of possible SH brackets (self.sh_brackets). The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket, corresponding to the SH bracket under HB as registered by current_SH_bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a
    series of SH brackets, where the SH brackets comprising HB is repeated. This is
    done by iterating over the closed loop of possible SH brackets (self.sh_brackets).
    The oldest, active, incomplete SH bracket is searched for to choose the next
    evaluation. If either all brackets are over or waiting, a new SH bracket,
    corresponding to the SH bracket under HB as registered by `current_SH_bracket`.
    """
    n_sh_brackets = len(self.sh_brackets)
    # iterates over the different SH brackets
    self.current_sh_bracket = 0  # indexing from range(0, n_sh_brackets)
    start = 0
    _min_rung = self.sh_brackets[self.current_sh_bracket].min_rung
    end = self.sh_brackets[self.current_sh_bracket].config_map[_min_rung]

    if self.sample_default_first and self.sample_default_at_target:
        start += 1
        end += 1

    # stores the base rung size for each SH bracket in HB
    base_rung_sizes = []  # sorted(self.config_map.values(), reverse=True)
    for bracket in self.sh_brackets.values():
        base_rung_sizes.append(sorted(bracket.config_map.values(), reverse=True)[0])
    while end <= len(self.observed_configs):
        # subsetting only this SH bracket from the history
        sh_bracket = self.sh_brackets[self.current_sh_bracket]
        sh_bracket.clean_rung_information()
        # for the SH bracket in start-end, calculate total SH budget used, from the
        # correct SH bracket object to make the right budget calculations

        bracket_budget_used = sh_bracket._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than the total SH budget then still an active bracket
        current_bracket_full_budget = sum(sh_bracket.full_rung_trace)
        if bracket_budget_used < current_bracket_full_budget:
            # updating rung information of the current bracket

            sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, signals to starts a new SH bracket
            sh_bracket._handle_promotions()
            promotion_count = 0
            for _, promotions in sh_bracket.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found which is the
                # current SH bracket at this scope
                return
            # if no promotions, ensure an empty state explicitly to disable bracket
            sh_bracket.clean_rung_information()
        start = end
        # updating pointer to the next SH bracket in HB
        self.current_sh_bracket = (self.current_sh_bracket + 1) % n_sh_brackets
        end = start + base_rung_sizes[self.current_sh_bracket]
    # reaches here if all old brackets are either waiting or finished

    # updates rung info with the latest active, incomplete bracket
    sh_bracket = self.sh_brackets[self.current_sh_bracket]

    sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
    sh_bracket._handle_promotions()

find_1nn_distance_from_incumbent #

find_1nn_distance_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_1nn_distance_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    distances = self.find_all_distances_from_incumbent(incumbent)
    distance = min(distances)
    return distance

find_all_distances_from_incumbent #

find_all_distances_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_all_distances_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    dist = lambda x: compute_config_dist(incumbent, x)
    # computing distance of incumbent from all seen points in history
    distances = [dist(config) for config in self.observed_configs.config]
    # ensuring the distances exclude 0 or the distance from itself
    distances = [d for d in distances if d > 0]
    return distances

find_incumbent #

find_incumbent(rung: int = None) -> SearchSpace

Find the best performing configuration seen so far.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_incumbent(self, rung: int = None) -> SearchSpace:
    """Find the best performing configuration seen so far."""
    rungs = self.observed_configs.rung.values
    idxs = self.observed_configs.index.values
    while rung is not None:
        # enters this scope is `rung` argument passed and not left empty or None
        if rung not in rungs:
            self.logger.warn(f"{rung} not in {np.unique(idxs)}")
        # filtering by rung based on argument passed
        idxs = self.observed_configs.rung.values == rung
        # checking width of current rung
        if len(idxs) < self.eta:
            self.logger.warn(
                f"Selecting incumbent from a rung with width less than {self.eta}"
            )
    # extracting the incumbent configuration
    if len(idxs):
        # finding the config with the lowest recorded performance
        _perfs = self.observed_configs.loc[idxs].perf.values
        inc_idx = np.nanargmin([np.nan if t is None else t for t in _perfs])
        inc = self.observed_configs.loc[idxs].iloc[inc_idx].config
    else:
        # THIS block should not ever execute, but for runtime anomalies, if no
        # incumbent can be extracted, the prior is treated as the incumbent
        inc = self.pipeline_space.sample_default_configuration()
        self.logger.warn(
            "Treating the prior as the incumbent. "
            "Please check if this should not happen."
        )
    return inc

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    self.set_sampling_weights_and_inc(rung=self.current_sh_bracket)

    for _, sh in self.sh_brackets.items():
        sh.sampling_args = self.sampling_args
    return super().get_config_and_ids()

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_activate_inc #

is_activate_inc() -> bool

Function to check optimization state to allow/disallow incumbent sampling.

This function checks if the total resources used for the finished evaluations sums to the budget of one full SH bracket.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def is_activate_inc(self) -> bool:
    """Function to check optimization state to allow/disallow incumbent sampling.

    This function checks if the total resources used for the finished evaluations
    sums to the budget of one full SH bracket.
    """
    activate_inc = False

    # calculate total resource cost required for the first SH bracket in HB
    if hasattr(self, "sh_brackets") and len(self.sh_brackets) > 1:
        # for HB or AsyncHB which invokes multiple SH brackets
        bracket = self.sh_brackets[self.min_rung]
    else:
        # for SH or ASHA which do not invoke multiple SH brackets
        bracket = self
    # calculating the total resources spent in the first SH bracket, taking into
    # account the continuations, that is, the resources spent on a promoted config is
    # not fidelity[rung] but (fidelity[rung] - fidelity[rung - 1])
    continuation_resources = bracket.rung_map[bracket.min_rung]
    resources = bracket.config_map[bracket.min_rung] * continuation_resources
    for r in range(1, len(bracket.rung_map)):
        rung = sorted(list(bracket.rung_map.keys()), reverse=False)[r]
        continuation_resources = bracket.rung_map[rung] - bracket.rung_map[rung - 1]
        resources += bracket.config_map[rung] * continuation_resources

    # find resources spent so far for all finished evaluations
    resources_used = calc_total_resources_spent(self.observed_configs, self.rung_map)

    if resources_used >= resources and len(
        self.rung_histories[self.max_rung]["config"]
    ):
        # activate incumbent-based sampling if a total resources is at least
        # equivalent to one SH bracket resource usage, and additionally, for the
        # asynchronous case with large number of workers, the check enforces that
        # at least one configuration has been evaluated at the highest fidelity
        activate_inc = True
    return activate_inc

is_init_phase #

is_init_phase() -> bool

Returns True is in the warmstart phase and False under model-based search.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def is_init_phase(self) -> bool:
    """Returns True is in the warmstart phase and False under model-based search."""
    if self.modelling_type == "rung":
        # build a model per rung or per fidelity
        # in this case, the initial design checks if `init_size` number of
        # configurations have finished at a rung or not and the highest such rung is
        # chosen for model building at teh current iteration
        if self._active_rung() is None:
            return True
    elif self.modelling_type == "joint":
        # builds a model across all fidelities with the fidelity as a dimension
        # in this case, calculate the total number of function evaluations spent
        # and in vanilla BO fashion use that to compare with the initital design size
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        resources /= self.max_budget
        if resources < self.init_size:
            return True
    else:
        raise ValueError("Choice of modelling_type not in {{'rung', 'joint'}}")
    return False

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

prior_to_incumbent_ratio #

prior_to_incumbent_ratio() -> float | float

Calculates the normalized weight distribution between prior and incumbent.

Sum of the weights should be 1.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def prior_to_incumbent_ratio(self) -> float | float:
    """Calculates the normalized weight distribution between prior and incumbent.

    Sum of the weights should be 1.
    """
    if self.inc_style == "constant":
        return self._prior_to_incumbent_ratio_constant()
    elif self.inc_style == "decay":
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        return self._prior_to_incumbent_ratio_decay(
            resources, self.eta, self.min_budget, self.max_budget
        )
    elif self.inc_style == "dynamic":
        return self._prior_to_incumbent_ratio_dynamic(self.max_rung)
    else:
        raise ValueError(f"Invalid option {self.inc_style}")

sample_new_config #

sample_new_config(rung: int = None, **kwargs)

Samples configuration from policies or random.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def sample_new_config(
    self,
    rung: int = None,
    **kwargs,
):
    """Samples configuration from policies or random."""
    if self.model_based and not self.is_init_phase():
        incumbent = None
        if self.modelling_type == "rung":
            # `rung` should not be None when not in init phase
            active_max_rung = self._active_rung()
            fidelity = None
            active_max_fidelity = self.rung_map[active_max_rung]
        elif self.modelling_type == "joint":
            fidelity = self.rung_map[rung]
            active_max_fidelity = None
            # IMPORTANT step for correct 2-step acquisition
            incumbent = min(self.rung_histories[rung]["perf"])
        else:
            fidelity = active_max_fidelity = None
        assert (
            (fidelity is None and active_max_fidelity is not None)
            or (active_max_fidelity is None and fidelity is not None)
            or (active_max_fidelity is not None and fidelity is not None)
        ), "Either condition needs to be not None!"
        config = self.model_policy.sample(
            active_max_fidelity=active_max_fidelity,
            fidelity=fidelity,
            incumbent=incumbent,
            **self.sampling_args,
        )
    elif self.sampling_policy is not None:
        config = self.sampling_policy.sample(**self.sampling_args)
    else:
        config = self.pipeline_space.sample(
            patience=self.patience,
            user_priors=self.use_priors,
            ignore_fidelity=True,
        )
    return config