Skip to content

Async priorband

neps.optimizers.multi_fidelity_prior.async_priorband #

PriorBandAsha #

PriorBandAsha(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = EnsemblePolicy,
    promotion_policy: Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",
    inc_sample_type: str = "mutation",
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",
    model_based: bool = False,
    modelling_type: str = "joint",
    initial_design_size: int = None,
    model_policy: Any = ModelPolicy,
    surrogate_model: str | Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: (
        str | AcquisitionSampler
    ) = "random",
)

Bases: MFBOBase, PriorBandBase, AsynchronousSuccessiveHalvingWithPriors

Implements a PriorBand on top of ASHA.

Source code in neps/optimizers/multi_fidelity_prior/async_priorband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = EnsemblePolicy,  # key difference to ASHA
    promotion_policy: typing.Any = AsyncPromotionPolicy,  # key difference from SH
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",  # could also be {"linear", "50-50"}
    inc_sample_type: str = "mutation",  # or {"crossover", "gaussian", "hypersphere"}
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",  # could also be {"decay", "constant"}
    # arguments for model
    model_based: bool = False,  # crucial argument to set to allow model-search
    modelling_type: str = "joint",  # could also be {"rung"}
    initial_design_size: int = None,
    model_policy: typing.Any = ModelPolicy,
    surrogate_model: str | typing.Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: str | AcquisitionSampler = "random",
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        early_stopping_rate=early_stopping_rate,
        initial_design_type=initial_design_type,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    self.prior_weight_type = prior_weight_type
    self.inc_sample_type = inc_sample_type
    self.inc_mutation_rate = inc_mutation_rate
    self.inc_mutation_std = inc_mutation_std
    self.sampling_policy = sampling_policy(
        pipeline_space=pipeline_space, inc_type=self.inc_sample_type
    )
    # determines the kind of trade-off between incumbent and prior weightage
    self.inc_style = inc_style  # used by PriorBandBase
    self.sampling_args = {
        "inc": None,
        "weights": {
            "prior": 1,  # begin with only prior sampling
            "inc": 0,
            "random": 0,
        },
    }

    bo_args = dict(
        surrogate_model=surrogate_model,
        domain_se_kernel=domain_se_kernel,
        hp_kernels=hp_kernels,
        surrogate_model_args=surrogate_model_args,
        acquisition=acquisition,
        log_prior_weighted=log_prior_weighted,
        acquisition_sampler=acquisition_sampler,
    )
    self.model_based = model_based
    self.modelling_type = modelling_type
    self.initial_design_size = initial_design_size
    # counting non-fidelity dimensions in search space
    ndims = sum(
        1
        for _, hp in self.pipeline_space.hyperparameters.items()
        if not hp.is_fidelity
    )
    n_min = ndims + 1
    self.init_size = n_min + 1  # in BOHB: init_design >= N_dim + 2
    if self.modelling_type == "joint" and self.initial_design_size is not None:
        self.init_size = self.initial_design_size
    self.model_policy = model_policy(pipeline_space, **bo_args)

calc_sampling_args #

calc_sampling_args(rung) -> dict

Sets the weights for each of the sampling techniques.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def calc_sampling_args(self, rung) -> dict:
    """Sets the weights for each of the sampling techniques."""
    if self.prior_weight_type == "geometric":
        _w_random = 1
        # scales weight of prior by eta raised to the current rung level
        # at the base rung thus w_prior = w_random
        # at the max rung r, w_prior = eta^r * w_random
        _w_prior = (self.eta**rung) * _w_random
    elif self.prior_weight_type == "linear":
        _w_random = 1
        w_prior_min_rung = 1 * _w_random
        w_prior_max_rung = self.eta * _w_random
        num_rungs = len(self.rung_map)
        # linearly increasing prior weight such that
        # at base rung, w_prior = w_random
        # at max rung, w_prior = self.eta * w_random
        _w_prior = np.linspace(
            start=w_prior_min_rung,
            stop=w_prior_max_rung,
            endpoint=True,
            num=num_rungs,
        )[rung]
    elif self.prior_weight_type == "50-50":
        _w_random = 1
        _w_prior = 1
    else:
        raise ValueError(f"{self.prior_weight_type} not in {{'linear', 'geometric'}}")

    # normalizing weights of random and prior sampling
    w_prior = _w_prior / (_w_prior + _w_random)
    w_random = _w_random / (_w_prior + _w_random)
    # calculating ratio of prior and incumbent weights
    _w_prior, _w_inc = self.prior_to_incumbent_ratio()
    # scaling back such that w_random + w_prior + w_inc = 1
    w_inc = _w_inc * w_prior
    w_prior = _w_prior * w_prior

    sampling_args = {
        "prior": w_prior,
        "inc": w_inc,
        "random": w_random,
    }
    return sampling_args

find_1nn_distance_from_incumbent #

find_1nn_distance_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_1nn_distance_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    distances = self.find_all_distances_from_incumbent(incumbent)
    distance = min(distances)
    return distance

find_all_distances_from_incumbent #

find_all_distances_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_all_distances_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    dist = lambda x: compute_config_dist(incumbent, x)
    # computing distance of incumbent from all seen points in history
    distances = [dist(config) for config in self.observed_configs.config]
    # ensuring the distances exclude 0 or the distance from itself
    distances = [d for d in distances if d > 0]
    return distances

find_incumbent #

find_incumbent(rung: int = None) -> SearchSpace

Find the best performing configuration seen so far.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_incumbent(self, rung: int = None) -> SearchSpace:
    """Find the best performing configuration seen so far."""
    rungs = self.observed_configs.rung.values
    idxs = self.observed_configs.index.values
    while rung is not None:
        # enters this scope is `rung` argument passed and not left empty or None
        if rung not in rungs:
            self.logger.warn(f"{rung} not in {np.unique(idxs)}")
        # filtering by rung based on argument passed
        idxs = self.observed_configs.rung.values == rung
        # checking width of current rung
        if len(idxs) < self.eta:
            self.logger.warn(
                f"Selecting incumbent from a rung with width less than {self.eta}"
            )
    # extracting the incumbent configuration
    if len(idxs):
        # finding the config with the lowest recorded performance
        _perfs = self.observed_configs.loc[idxs].perf.values
        inc_idx = np.nanargmin([np.nan if t is None else t for t in _perfs])
        inc = self.observed_configs.loc[idxs].iloc[inc_idx].config
    else:
        # THIS block should not ever execute, but for runtime anomalies, if no
        # incumbent can be extracted, the prior is treated as the incumbent
        inc = self.pipeline_space.sample_default_configuration()
        self.logger.warn(
            "Treating the prior as the incumbent. "
            "Please check if this should not happen."
        )
    return inc

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity_prior/async_priorband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    rung_to_promote = self.is_promotable()
    if rung_to_promote is not None:
        rung = rung_to_promote + 1
    else:
        rung = self.min_rung
    self.set_sampling_weights_and_inc(rung=rung)
    # performs standard ASHA but sampling happens as per the EnsemblePolicy
    return super().get_config_and_ids()

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_activate_inc #

is_activate_inc() -> bool

Function to check optimization state to allow/disallow incumbent sampling.

This function checks if the total resources used for the finished evaluations sums to the budget of one full SH bracket.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def is_activate_inc(self) -> bool:
    """Function to check optimization state to allow/disallow incumbent sampling.

    This function checks if the total resources used for the finished evaluations
    sums to the budget of one full SH bracket.
    """
    activate_inc = False

    # calculate total resource cost required for the first SH bracket in HB
    if hasattr(self, "sh_brackets") and len(self.sh_brackets) > 1:
        # for HB or AsyncHB which invokes multiple SH brackets
        bracket = self.sh_brackets[self.min_rung]
    else:
        # for SH or ASHA which do not invoke multiple SH brackets
        bracket = self
    # calculating the total resources spent in the first SH bracket, taking into
    # account the continuations, that is, the resources spent on a promoted config is
    # not fidelity[rung] but (fidelity[rung] - fidelity[rung - 1])
    continuation_resources = bracket.rung_map[bracket.min_rung]
    resources = bracket.config_map[bracket.min_rung] * continuation_resources
    for r in range(1, len(bracket.rung_map)):
        rung = sorted(list(bracket.rung_map.keys()), reverse=False)[r]
        continuation_resources = bracket.rung_map[rung] - bracket.rung_map[rung - 1]
        resources += bracket.config_map[rung] * continuation_resources

    # find resources spent so far for all finished evaluations
    resources_used = calc_total_resources_spent(self.observed_configs, self.rung_map)

    if resources_used >= resources and len(
        self.rung_histories[self.max_rung]["config"]
    ):
        # activate incumbent-based sampling if a total resources is at least
        # equivalent to one SH bracket resource usage, and additionally, for the
        # asynchronous case with large number of workers, the check enforces that
        # at least one configuration has been evaluated at the highest fidelity
        activate_inc = True
    return activate_inc

is_init_phase #

is_init_phase() -> bool

Returns True is in the warmstart phase and False under model-based search.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def is_init_phase(self) -> bool:
    """Returns True is in the warmstart phase and False under model-based search."""
    if self.modelling_type == "rung":
        # build a model per rung or per fidelity
        # in this case, the initial design checks if `init_size` number of
        # configurations have finished at a rung or not and the highest such rung is
        # chosen for model building at teh current iteration
        if self._active_rung() is None:
            return True
    elif self.modelling_type == "joint":
        # builds a model across all fidelities with the fidelity as a dimension
        # in this case, calculate the total number of function evaluations spent
        # and in vanilla BO fashion use that to compare with the initital design size
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        resources /= self.max_budget
        if resources < self.init_size:
            return True
    else:
        raise ValueError("Choice of modelling_type not in {{'rung', 'joint'}}")
    return False

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

load_results #

load_results(
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None

This is basically the fit method.

PARAMETER DESCRIPTION
previous_results

[description]

TYPE: dict[str, ConfigResult]

pending_evaluations

[description]

TYPE: dict[str, ConfigResult]

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def load_results(
    self,
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None:
    """This is basically the fit method.

    Args:
        previous_results (dict[str, ConfigResult]): [description]
        pending_evaluations (dict[str, ConfigResult]): [description]
    """

    self.rung_histories = {
        rung: {"config": [], "perf": []}
        for rung in range(self.min_rung, self.max_rung + 1)
    }

    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))

    # previous optimization run exists and needs to be loaded
    self._load_previous_observations(previous_results)
    self.total_fevals = len(previous_results) + len(pending_evaluations)

    # account for pending evaluations
    self._handle_pending_evaluations(pending_evaluations)

    # process optimization state and bucket observations per rung
    self._get_rungs_state()

    # filter/reset old SH brackets
    self.clear_old_brackets()

    # identifying promotion list per rung
    self._handle_promotions()

    # fit any model/surrogates
    self._fit_models()

    return

prior_to_incumbent_ratio #

prior_to_incumbent_ratio() -> float | float

Calculates the normalized weight distribution between prior and incumbent.

Sum of the weights should be 1.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def prior_to_incumbent_ratio(self) -> float | float:
    """Calculates the normalized weight distribution between prior and incumbent.

    Sum of the weights should be 1.
    """
    if self.inc_style == "constant":
        return self._prior_to_incumbent_ratio_constant()
    elif self.inc_style == "decay":
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        return self._prior_to_incumbent_ratio_decay(
            resources, self.eta, self.min_budget, self.max_budget
        )
    elif self.inc_style == "dynamic":
        return self._prior_to_incumbent_ratio_dynamic(self.max_rung)
    else:
        raise ValueError(f"Invalid option {self.inc_style}")

sample_new_config #

sample_new_config(rung: int = None, **kwargs)

Samples configuration from policies or random.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def sample_new_config(
    self,
    rung: int = None,
    **kwargs,
):
    """Samples configuration from policies or random."""
    if self.model_based and not self.is_init_phase():
        incumbent = None
        if self.modelling_type == "rung":
            # `rung` should not be None when not in init phase
            active_max_rung = self._active_rung()
            fidelity = None
            active_max_fidelity = self.rung_map[active_max_rung]
        elif self.modelling_type == "joint":
            fidelity = self.rung_map[rung]
            active_max_fidelity = None
            # IMPORTANT step for correct 2-step acquisition
            incumbent = min(self.rung_histories[rung]["perf"])
        else:
            fidelity = active_max_fidelity = None
        assert (
            (fidelity is None and active_max_fidelity is not None)
            or (active_max_fidelity is None and fidelity is not None)
            or (active_max_fidelity is not None and fidelity is not None)
        ), "Either condition needs to be not None!"
        config = self.model_policy.sample(
            active_max_fidelity=active_max_fidelity,
            fidelity=fidelity,
            incumbent=incumbent,
            **self.sampling_args,
        )
    elif self.sampling_policy is not None:
        config = self.sampling_policy.sample(**self.sampling_args)
    else:
        config = self.pipeline_space.sample(
            patience=self.patience,
            user_priors=self.use_priors,
            ignore_fidelity=True,
        )
    return config

PriorBandAshaHB #

PriorBandAshaHB(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = EnsemblePolicy,
    promotion_policy: Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",
    inc_sample_type: str = "mutation",
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",
    model_based: bool = False,
    modelling_type: str = "joint",
    initial_design_size: int = None,
    model_policy: Any = ModelPolicy,
    surrogate_model: str | Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: (
        str | AcquisitionSampler
    ) = "random",
)

Bases: PriorBandAsha

Implements a PriorBand on top of ASHA-HB (Mobster).

Source code in neps/optimizers/multi_fidelity_prior/async_priorband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = EnsemblePolicy,  # key difference to ASHA
    promotion_policy: typing.Any = AsyncPromotionPolicy,  # key difference from PB
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = True,
    sample_default_at_target: bool = True,
    prior_weight_type: str = "geometric",  # could also be {"linear", "50-50"}
    inc_sample_type: str = "mutation",  # or {"crossover", "gaussian", "hypersphere"}
    inc_mutation_rate: float = 0.5,
    inc_mutation_std: float = 0.25,
    inc_style: str = "dynamic",  # could also be {"decay", "constant"}
    # arguments for model
    model_based: bool = False,  # crucial argument to set to allow model-search
    modelling_type: str = "joint",  # could also be {"rung"}
    initial_design_size: int = None,
    model_policy: typing.Any = ModelPolicy,
    surrogate_model: str | typing.Any = "gp",
    domain_se_kernel: str = None,
    hp_kernels: list = None,
    surrogate_model_args: dict = None,
    acquisition: str | BaseAcquisition = "EI",
    log_prior_weighted: bool = False,
    acquisition_sampler: str | AcquisitionSampler = "random",
):
    # collecting arguments required by ASHA
    args = dict(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        early_stopping_rate=self.early_stopping_rate,
        initial_design_type=initial_design_type,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    bo_args = dict(
        surrogate_model=surrogate_model,
        domain_se_kernel=domain_se_kernel,
        hp_kernels=hp_kernels,
        surrogate_model_args=surrogate_model_args,
        acquisition=acquisition,
        log_prior_weighted=log_prior_weighted,
        acquisition_sampler=acquisition_sampler,
    )
    super().__init__(
        **args,
        prior_weight_type=prior_weight_type,
        inc_sample_type=inc_sample_type,
        inc_mutation_rate=inc_mutation_rate,
        inc_mutation_std=inc_mutation_std,
        inc_style=inc_style,
        model_based=model_based,
        modelling_type=modelling_type,
        initial_design_size=initial_design_size,
        model_policy=model_policy,
        **bo_args,
    )

    # Creating the ASHA (SH) brackets that Hyperband iterates over
    self.sh_brackets = {}
    for s in range(self.max_rung + 1):
        args.update({"early_stopping_rate": s})
        # key difference from vanilla HB where it runs synchronous SH brackets
        self.sh_brackets[s] = AsynchronousSuccessiveHalvingWithPriors(**args)
        self.sh_brackets[s].sampling_policy = self.sampling_policy
        self.sh_brackets[s].sampling_args = self.sampling_args
        self.sh_brackets[s].model_policy = self.model_policy
        self.sh_brackets[s].sample_new_config = self.sample_new_config

calc_sampling_args #

calc_sampling_args(rung) -> dict

Sets the weights for each of the sampling techniques.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def calc_sampling_args(self, rung) -> dict:
    """Sets the weights for each of the sampling techniques."""
    if self.prior_weight_type == "geometric":
        _w_random = 1
        # scales weight of prior by eta raised to the current rung level
        # at the base rung thus w_prior = w_random
        # at the max rung r, w_prior = eta^r * w_random
        _w_prior = (self.eta**rung) * _w_random
    elif self.prior_weight_type == "linear":
        _w_random = 1
        w_prior_min_rung = 1 * _w_random
        w_prior_max_rung = self.eta * _w_random
        num_rungs = len(self.rung_map)
        # linearly increasing prior weight such that
        # at base rung, w_prior = w_random
        # at max rung, w_prior = self.eta * w_random
        _w_prior = np.linspace(
            start=w_prior_min_rung,
            stop=w_prior_max_rung,
            endpoint=True,
            num=num_rungs,
        )[rung]
    elif self.prior_weight_type == "50-50":
        _w_random = 1
        _w_prior = 1
    else:
        raise ValueError(f"{self.prior_weight_type} not in {{'linear', 'geometric'}}")

    # normalizing weights of random and prior sampling
    w_prior = _w_prior / (_w_prior + _w_random)
    w_random = _w_random / (_w_prior + _w_random)
    # calculating ratio of prior and incumbent weights
    _w_prior, _w_inc = self.prior_to_incumbent_ratio()
    # scaling back such that w_random + w_prior + w_inc = 1
    w_inc = _w_inc * w_prior
    w_prior = _w_prior * w_prior

    sampling_args = {
        "prior": w_prior,
        "inc": w_inc,
        "random": w_random,
    }
    return sampling_args

find_1nn_distance_from_incumbent #

find_1nn_distance_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_1nn_distance_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    distances = self.find_all_distances_from_incumbent(incumbent)
    distance = min(distances)
    return distance

find_all_distances_from_incumbent #

find_all_distances_from_incumbent(incumbent)

Finds the distance to the nearest neighbour.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_all_distances_from_incumbent(self, incumbent):
    """Finds the distance to the nearest neighbour."""
    dist = lambda x: compute_config_dist(incumbent, x)
    # computing distance of incumbent from all seen points in history
    distances = [dist(config) for config in self.observed_configs.config]
    # ensuring the distances exclude 0 or the distance from itself
    distances = [d for d in distances if d > 0]
    return distances

find_incumbent #

find_incumbent(rung: int = None) -> SearchSpace

Find the best performing configuration seen so far.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def find_incumbent(self, rung: int = None) -> SearchSpace:
    """Find the best performing configuration seen so far."""
    rungs = self.observed_configs.rung.values
    idxs = self.observed_configs.index.values
    while rung is not None:
        # enters this scope is `rung` argument passed and not left empty or None
        if rung not in rungs:
            self.logger.warn(f"{rung} not in {np.unique(idxs)}")
        # filtering by rung based on argument passed
        idxs = self.observed_configs.rung.values == rung
        # checking width of current rung
        if len(idxs) < self.eta:
            self.logger.warn(
                f"Selecting incumbent from a rung with width less than {self.eta}"
            )
    # extracting the incumbent configuration
    if len(idxs):
        # finding the config with the lowest recorded performance
        _perfs = self.observed_configs.loc[idxs].perf.values
        inc_idx = np.nanargmin([np.nan if t is None else t for t in _perfs])
        inc = self.observed_configs.loc[idxs].iloc[inc_idx].config
    else:
        # THIS block should not ever execute, but for runtime anomalies, if no
        # incumbent can be extracted, the prior is treated as the incumbent
        inc = self.pipeline_space.sample_default_configuration()
        self.logger.warn(
            "Treating the prior as the incumbent. "
            "Please check if this should not happen."
        )
    return inc

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity_prior/async_priorband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    # the rung to sample at
    bracket_to_run = self._get_bracket_to_run()

    self.set_sampling_weights_and_inc(rung=bracket_to_run)
    self.sh_brackets[bracket_to_run].sampling_args = self.sampling_args
    config, config_id, previous_config_id = self.sh_brackets[
        bracket_to_run
    ].get_config_and_ids()
    return config, config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_activate_inc #

is_activate_inc() -> bool

Function to check optimization state to allow/disallow incumbent sampling.

This function checks if the total resources used for the finished evaluations sums to the budget of one full SH bracket.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def is_activate_inc(self) -> bool:
    """Function to check optimization state to allow/disallow incumbent sampling.

    This function checks if the total resources used for the finished evaluations
    sums to the budget of one full SH bracket.
    """
    activate_inc = False

    # calculate total resource cost required for the first SH bracket in HB
    if hasattr(self, "sh_brackets") and len(self.sh_brackets) > 1:
        # for HB or AsyncHB which invokes multiple SH brackets
        bracket = self.sh_brackets[self.min_rung]
    else:
        # for SH or ASHA which do not invoke multiple SH brackets
        bracket = self
    # calculating the total resources spent in the first SH bracket, taking into
    # account the continuations, that is, the resources spent on a promoted config is
    # not fidelity[rung] but (fidelity[rung] - fidelity[rung - 1])
    continuation_resources = bracket.rung_map[bracket.min_rung]
    resources = bracket.config_map[bracket.min_rung] * continuation_resources
    for r in range(1, len(bracket.rung_map)):
        rung = sorted(list(bracket.rung_map.keys()), reverse=False)[r]
        continuation_resources = bracket.rung_map[rung] - bracket.rung_map[rung - 1]
        resources += bracket.config_map[rung] * continuation_resources

    # find resources spent so far for all finished evaluations
    resources_used = calc_total_resources_spent(self.observed_configs, self.rung_map)

    if resources_used >= resources and len(
        self.rung_histories[self.max_rung]["config"]
    ):
        # activate incumbent-based sampling if a total resources is at least
        # equivalent to one SH bracket resource usage, and additionally, for the
        # asynchronous case with large number of workers, the check enforces that
        # at least one configuration has been evaluated at the highest fidelity
        activate_inc = True
    return activate_inc

is_init_phase #

is_init_phase() -> bool

Returns True is in the warmstart phase and False under model-based search.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def is_init_phase(self) -> bool:
    """Returns True is in the warmstart phase and False under model-based search."""
    if self.modelling_type == "rung":
        # build a model per rung or per fidelity
        # in this case, the initial design checks if `init_size` number of
        # configurations have finished at a rung or not and the highest such rung is
        # chosen for model building at teh current iteration
        if self._active_rung() is None:
            return True
    elif self.modelling_type == "joint":
        # builds a model across all fidelities with the fidelity as a dimension
        # in this case, calculate the total number of function evaluations spent
        # and in vanilla BO fashion use that to compare with the initital design size
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        resources /= self.max_budget
        if resources < self.init_size:
            return True
    else:
        raise ValueError("Choice of modelling_type not in {{'rung', 'joint'}}")
    return False

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

prior_to_incumbent_ratio #

prior_to_incumbent_ratio() -> float | float

Calculates the normalized weight distribution between prior and incumbent.

Sum of the weights should be 1.

Source code in neps/optimizers/multi_fidelity_prior/priorband.py
def prior_to_incumbent_ratio(self) -> float | float:
    """Calculates the normalized weight distribution between prior and incumbent.

    Sum of the weights should be 1.
    """
    if self.inc_style == "constant":
        return self._prior_to_incumbent_ratio_constant()
    elif self.inc_style == "decay":
        resources = calc_total_resources_spent(self.observed_configs, self.rung_map)
        return self._prior_to_incumbent_ratio_decay(
            resources, self.eta, self.min_budget, self.max_budget
        )
    elif self.inc_style == "dynamic":
        return self._prior_to_incumbent_ratio_dynamic(self.max_rung)
    else:
        raise ValueError(f"Invalid option {self.inc_style}")

sample_new_config #

sample_new_config(rung: int = None, **kwargs)

Samples configuration from policies or random.

Source code in neps/optimizers/multi_fidelity/mf_bo.py
def sample_new_config(
    self,
    rung: int = None,
    **kwargs,
):
    """Samples configuration from policies or random."""
    if self.model_based and not self.is_init_phase():
        incumbent = None
        if self.modelling_type == "rung":
            # `rung` should not be None when not in init phase
            active_max_rung = self._active_rung()
            fidelity = None
            active_max_fidelity = self.rung_map[active_max_rung]
        elif self.modelling_type == "joint":
            fidelity = self.rung_map[rung]
            active_max_fidelity = None
            # IMPORTANT step for correct 2-step acquisition
            incumbent = min(self.rung_histories[rung]["perf"])
        else:
            fidelity = active_max_fidelity = None
        assert (
            (fidelity is None and active_max_fidelity is not None)
            or (active_max_fidelity is None and fidelity is not None)
            or (active_max_fidelity is not None and fidelity is not None)
        ), "Either condition needs to be not None!"
        config = self.model_policy.sample(
            active_max_fidelity=active_max_fidelity,
            fidelity=fidelity,
            incumbent=incumbent,
            **self.sampling_args,
        )
    elif self.sampling_policy is not None:
        config = self.sampling_policy.sample(**self.sampling_args)
    else:
        config = self.pipeline_space.sample(
            patience=self.patience,
            user_priors=self.use_priors,
            ignore_fidelity=True,
        )
    return config