Skip to content

Hyperband

neps.optimizers.multi_fidelity.hyperband #

AsynchronousHyperband #

AsynchronousHyperband(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    use_priors: bool = False,
    sampling_policy: Any = RandomUniformPolicy,
    promotion_policy: Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: HyperbandBase

Implements ASHA but as Hyperband.

Implements the Promotion variant of ASHA as used in Mobster.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    use_priors: bool = False,
    sampling_policy: typing.Any = RandomUniformPolicy,
    promotion_policy: typing.Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    args = dict(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        initial_design_type=initial_design_type,
        use_priors=use_priors,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    super().__init__(**args)
    # overwrite parent class SH brackets with Async SH brackets
    self.sh_brackets = {}
    for s in range(self.max_rung + 1):
        args.update({"early_stopping_rate": s})
        # key difference from vanilla HB where it runs synchronous SH brackets
        self.sh_brackets[s] = AsynchronousSuccessiveHalving(**args)

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket."""
    # unlike synchronous SH, the state is not reset at each rung and a configuration
    # is promoted if the rung has eta configs if it is the top performing
    # base class allows for retaining the whole optimization state
    return

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/hyperband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    # the rung to sample at
    bracket_to_run = self._get_bracket_to_run()
    config, config_id, previous_config_id = self.sh_brackets[
        bracket_to_run
    ].get_config_and_ids()
    return config, config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

AsynchronousHyperbandWithPriors #

AsynchronousHyperbandWithPriors(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = FixedPriorPolicy,
    promotion_policy: Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: AsynchronousHyperband

Implements ASHA but as Hyperband.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = FixedPriorPolicy,
    promotion_policy: typing.Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        initial_design_type=initial_design_type,
        use_priors=self.use_priors,  # key change to the base Async HB class
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket."""
    # unlike synchronous SH, the state is not reset at each rung and a configuration
    # is promoted if the rung has eta configs if it is the top performing
    # base class allows for retaining the whole optimization state
    return

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/hyperband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    # the rung to sample at
    bracket_to_run = self._get_bracket_to_run()
    config, config_id, previous_config_id = self.sh_brackets[
        bracket_to_run
    ].get_config_and_ids()
    return config, config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

Hyperband #

Hyperband(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    use_priors: bool = False,
    sampling_policy: Any = RandomUniformPolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: HyperbandBase

Source code in neps/optimizers/multi_fidelity/hyperband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    use_priors: bool = False,
    sampling_policy: typing.Any = RandomUniformPolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    args = dict(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        early_stopping_rate=self.early_stopping_rate,  # HB subsumes this param of SH
        initial_design_type=initial_design_type,
        use_priors=use_priors,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    super().__init__(**args)
    # stores the flattened sequence of SH brackets to loop over - the HB heuristic
    # for (n,r) pairing, i.e., (num. configs, fidelity)
    self.full_rung_trace = []
    self.sh_brackets = {}
    for s in range(self.max_rung + 1):
        args.update({"early_stopping_rate": s})
        self.sh_brackets[s] = SuccessiveHalving(**args)
        # `full_rung_trace` contains the index of SH bracket to run sequentially
        self.full_rung_trace.extend([s] * len(self.sh_brackets[s].full_rung_trace))
    # book-keeping variables
    self.current_sh_bracket = None  # type: ignore
    self.old_history_len = None

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a series of SH brackets, where the SH brackets comprising HB is repeated. This is done by iterating over the closed loop of possible SH brackets (self.sh_brackets). The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket, corresponding to the SH bracket under HB as registered by current_SH_bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a
    series of SH brackets, where the SH brackets comprising HB is repeated. This is
    done by iterating over the closed loop of possible SH brackets (self.sh_brackets).
    The oldest, active, incomplete SH bracket is searched for to choose the next
    evaluation. If either all brackets are over or waiting, a new SH bracket,
    corresponding to the SH bracket under HB as registered by `current_SH_bracket`.
    """
    n_sh_brackets = len(self.sh_brackets)
    # iterates over the different SH brackets
    self.current_sh_bracket = 0  # indexing from range(0, n_sh_brackets)
    start = 0
    _min_rung = self.sh_brackets[self.current_sh_bracket].min_rung
    end = self.sh_brackets[self.current_sh_bracket].config_map[_min_rung]

    if self.sample_default_first and self.sample_default_at_target:
        start += 1
        end += 1

    # stores the base rung size for each SH bracket in HB
    base_rung_sizes = []  # sorted(self.config_map.values(), reverse=True)
    for bracket in self.sh_brackets.values():
        base_rung_sizes.append(sorted(bracket.config_map.values(), reverse=True)[0])
    while end <= len(self.observed_configs):
        # subsetting only this SH bracket from the history
        sh_bracket = self.sh_brackets[self.current_sh_bracket]
        sh_bracket.clean_rung_information()
        # for the SH bracket in start-end, calculate total SH budget used, from the
        # correct SH bracket object to make the right budget calculations

        bracket_budget_used = sh_bracket._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than the total SH budget then still an active bracket
        current_bracket_full_budget = sum(sh_bracket.full_rung_trace)
        if bracket_budget_used < current_bracket_full_budget:
            # updating rung information of the current bracket

            sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, signals to starts a new SH bracket
            sh_bracket._handle_promotions()
            promotion_count = 0
            for _, promotions in sh_bracket.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found which is the
                # current SH bracket at this scope
                return
            # if no promotions, ensure an empty state explicitly to disable bracket
            sh_bracket.clean_rung_information()
        start = end
        # updating pointer to the next SH bracket in HB
        self.current_sh_bracket = (self.current_sh_bracket + 1) % n_sh_brackets
        end = start + base_rung_sizes[self.current_sh_bracket]
    # reaches here if all old brackets are either waiting or finished

    # updates rung info with the latest active, incomplete bracket
    sh_bracket = self.sh_brackets[self.current_sh_bracket]

    sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
    sh_bracket._handle_promotions()

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/hyperband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    config, config_id, previous_config_id = self.sh_brackets[
        self.current_sh_bracket  # type: ignore
    ].get_config_and_ids()
    return config, config_id, previous_config_id

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

HyperbandBase #

HyperbandBase(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    use_priors: bool = False,
    sampling_policy: Any = RandomUniformPolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: SuccessiveHalvingBase

Implements a Hyperband procedure with a sampling and promotion policy.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    use_priors: bool = False,
    sampling_policy: typing.Any = RandomUniformPolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    args = dict(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        early_stopping_rate=self.early_stopping_rate,  # HB subsumes this param of SH
        initial_design_type=initial_design_type,
        use_priors=use_priors,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    super().__init__(**args)
    # stores the flattened sequence of SH brackets to loop over - the HB heuristic
    # for (n,r) pairing, i.e., (num. configs, fidelity)
    self.full_rung_trace = []
    self.sh_brackets = {}
    for s in range(self.max_rung + 1):
        args.update({"early_stopping_rate": s})
        self.sh_brackets[s] = SuccessiveHalving(**args)
        # `full_rung_trace` contains the index of SH bracket to run sequentially
        self.full_rung_trace.extend([s] * len(self.sh_brackets[s].full_rung_trace))
    # book-keeping variables
    self.current_sh_bracket = None  # type: ignore
    self.old_history_len = None

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket."""
    # unlike synchronous SH, the state is not reset at each rung and a configuration
    # is promoted if the rung has eta configs if it is the top performing
    # base class allows for retaining the whole optimization state
    return

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/hyperband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    raise NotImplementedError

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

HyperbandCustomDefault #

HyperbandCustomDefault(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = EnsemblePolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: HyperbandWithPriors

If prior specified, does 50% times priors and 50% random search like vanilla-HB.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = EnsemblePolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        initial_design_type=initial_design_type,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )
    self.sampling_args = {
        "inc": None,
        "weights": {
            "prior": 0.5,
            "inc": 0,
            "random": 0.5,
        },
    }
    for _, sh in self.sh_brackets.items():
        sh.sampling_args = self.sampling_args

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a series of SH brackets, where the SH brackets comprising HB is repeated. This is done by iterating over the closed loop of possible SH brackets (self.sh_brackets). The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket, corresponding to the SH bracket under HB as registered by current_SH_bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a
    series of SH brackets, where the SH brackets comprising HB is repeated. This is
    done by iterating over the closed loop of possible SH brackets (self.sh_brackets).
    The oldest, active, incomplete SH bracket is searched for to choose the next
    evaluation. If either all brackets are over or waiting, a new SH bracket,
    corresponding to the SH bracket under HB as registered by `current_SH_bracket`.
    """
    n_sh_brackets = len(self.sh_brackets)
    # iterates over the different SH brackets
    self.current_sh_bracket = 0  # indexing from range(0, n_sh_brackets)
    start = 0
    _min_rung = self.sh_brackets[self.current_sh_bracket].min_rung
    end = self.sh_brackets[self.current_sh_bracket].config_map[_min_rung]

    if self.sample_default_first and self.sample_default_at_target:
        start += 1
        end += 1

    # stores the base rung size for each SH bracket in HB
    base_rung_sizes = []  # sorted(self.config_map.values(), reverse=True)
    for bracket in self.sh_brackets.values():
        base_rung_sizes.append(sorted(bracket.config_map.values(), reverse=True)[0])
    while end <= len(self.observed_configs):
        # subsetting only this SH bracket from the history
        sh_bracket = self.sh_brackets[self.current_sh_bracket]
        sh_bracket.clean_rung_information()
        # for the SH bracket in start-end, calculate total SH budget used, from the
        # correct SH bracket object to make the right budget calculations

        bracket_budget_used = sh_bracket._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than the total SH budget then still an active bracket
        current_bracket_full_budget = sum(sh_bracket.full_rung_trace)
        if bracket_budget_used < current_bracket_full_budget:
            # updating rung information of the current bracket

            sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, signals to starts a new SH bracket
            sh_bracket._handle_promotions()
            promotion_count = 0
            for _, promotions in sh_bracket.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found which is the
                # current SH bracket at this scope
                return
            # if no promotions, ensure an empty state explicitly to disable bracket
            sh_bracket.clean_rung_information()
        start = end
        # updating pointer to the next SH bracket in HB
        self.current_sh_bracket = (self.current_sh_bracket + 1) % n_sh_brackets
        end = start + base_rung_sizes[self.current_sh_bracket]
    # reaches here if all old brackets are either waiting or finished

    # updates rung info with the latest active, incomplete bracket
    sh_bracket = self.sh_brackets[self.current_sh_bracket]

    sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
    sh_bracket._handle_promotions()

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/hyperband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    config, config_id, previous_config_id = self.sh_brackets[
        self.current_sh_bracket  # type: ignore
    ].get_config_and_ids()
    return config, config_id, previous_config_id

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

HyperbandWithPriors #

HyperbandWithPriors(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = FixedPriorPolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: Hyperband

Implements a Hyperband procedure with a sampling and promotion policy.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = FixedPriorPolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        initial_design_type=initial_design_type,
        use_priors=self.use_priors,  # key change to the base HB class
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a series of SH brackets, where the SH brackets comprising HB is repeated. This is done by iterating over the closed loop of possible SH brackets (self.sh_brackets). The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket, corresponding to the SH bracket under HB as registered by current_SH_bracket.

Source code in neps/optimizers/multi_fidelity/hyperband.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    To simulate reset of rungs like in vanilla HB, the algorithm is viewed as a
    series of SH brackets, where the SH brackets comprising HB is repeated. This is
    done by iterating over the closed loop of possible SH brackets (self.sh_brackets).
    The oldest, active, incomplete SH bracket is searched for to choose the next
    evaluation. If either all brackets are over or waiting, a new SH bracket,
    corresponding to the SH bracket under HB as registered by `current_SH_bracket`.
    """
    n_sh_brackets = len(self.sh_brackets)
    # iterates over the different SH brackets
    self.current_sh_bracket = 0  # indexing from range(0, n_sh_brackets)
    start = 0
    _min_rung = self.sh_brackets[self.current_sh_bracket].min_rung
    end = self.sh_brackets[self.current_sh_bracket].config_map[_min_rung]

    if self.sample_default_first and self.sample_default_at_target:
        start += 1
        end += 1

    # stores the base rung size for each SH bracket in HB
    base_rung_sizes = []  # sorted(self.config_map.values(), reverse=True)
    for bracket in self.sh_brackets.values():
        base_rung_sizes.append(sorted(bracket.config_map.values(), reverse=True)[0])
    while end <= len(self.observed_configs):
        # subsetting only this SH bracket from the history
        sh_bracket = self.sh_brackets[self.current_sh_bracket]
        sh_bracket.clean_rung_information()
        # for the SH bracket in start-end, calculate total SH budget used, from the
        # correct SH bracket object to make the right budget calculations

        bracket_budget_used = sh_bracket._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than the total SH budget then still an active bracket
        current_bracket_full_budget = sum(sh_bracket.full_rung_trace)
        if bracket_budget_used < current_bracket_full_budget:
            # updating rung information of the current bracket

            sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, signals to starts a new SH bracket
            sh_bracket._handle_promotions()
            promotion_count = 0
            for _, promotions in sh_bracket.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found which is the
                # current SH bracket at this scope
                return
            # if no promotions, ensure an empty state explicitly to disable bracket
            sh_bracket.clean_rung_information()
        start = end
        # updating pointer to the next SH bracket in HB
        self.current_sh_bracket = (self.current_sh_bracket + 1) % n_sh_brackets
        end = start + base_rung_sizes[self.current_sh_bracket]
    # reaches here if all old brackets are either waiting or finished

    # updates rung info with the latest active, incomplete bracket
    sh_bracket = self.sh_brackets[self.current_sh_bracket]

    sh_bracket._get_rungs_state(self.observed_configs.iloc[start:end])
    sh_bracket._handle_promotions()

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/hyperband.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    config, config_id, previous_config_id = self.sh_brackets[
        self.current_sh_bracket  # type: ignore
    ].get_config_and_ids()
    return config, config_id, previous_config_id

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote