Skip to content

Successive halving

neps.optimizers.multi_fidelity.successive_halving #

AsynchronousSuccessiveHalving #

AsynchronousSuccessiveHalving(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    use_priors: bool = False,
    sampling_policy: Any = RandomUniformPolicy,
    promotion_policy: Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: SuccessiveHalvingBase

Implements ASHA with a sampling and asynchronous promotion policy.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    use_priors: bool = False,
    sampling_policy: typing.Any = RandomUniformPolicy,
    promotion_policy: typing.Any = AsyncPromotionPolicy,  # key difference from SH
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        early_stopping_rate=early_stopping_rate,
        initial_design_type=initial_design_type,
        use_priors=use_priors,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/successive_halving.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    rung_to_promote = self.is_promotable()
    if rung_to_promote is not None:
        # promotes the first recorded promotable config in the argsort-ed rung
        row = self.observed_configs.iloc[self.rung_promotions[rung_to_promote][0]]
        config = deepcopy(row["config"])
        rung = rung_to_promote + 1
        # assigning the fidelity to evaluate the config at
        config.fidelity.value = self.rung_map[rung]
        # updating config IDs
        previous_config_id = f"{row.name}_{rung_to_promote}"
        config_id = f"{row.name}_{rung}"
    else:
        rung_id = self.min_rung
        # using random instead of np.random to be consistent with NePS BO
        if (
            self.use_priors
            and self.sample_default_first
            and len(self.observed_configs) == 0
        ):
            if self.sample_default_at_target:
                # sets the default config to be evaluated at the target fidelity
                rung_id = self.max_rung
                self.logger.info(
                    "Next config will be evaluated at target fidelity."
                )
            self.logger.info("Sampling the default configuration...")
            config = self.pipeline_space.sample_default_configuration()

        elif random.random() < self.random_interleave_prob:
            config = self.pipeline_space.sample(
                patience=self.patience,
                user_priors=False,  # sample uniformly random
                ignore_fidelity=True,
            )
        else:
            config = self.sample_new_config(rung=rung_id)

        fidelity_value = self.rung_map[rung_id]
        config.fidelity.value = fidelity_value

        previous_config_id = None
        config_id = f"{self._generate_new_config_id()}_{rung_id}"

    return config.hp_values(), config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

load_results #

load_results(
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None

This is basically the fit method.

PARAMETER DESCRIPTION
previous_results

[description]

TYPE: dict[str, ConfigResult]

pending_evaluations

[description]

TYPE: dict[str, ConfigResult]

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def load_results(
    self,
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None:
    """This is basically the fit method.

    Args:
        previous_results (dict[str, ConfigResult]): [description]
        pending_evaluations (dict[str, ConfigResult]): [description]
    """

    self.rung_histories = {
        rung: {"config": [], "perf": []}
        for rung in range(self.min_rung, self.max_rung + 1)
    }

    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))

    # previous optimization run exists and needs to be loaded
    self._load_previous_observations(previous_results)
    self.total_fevals = len(previous_results) + len(pending_evaluations)

    # account for pending evaluations
    self._handle_pending_evaluations(pending_evaluations)

    # process optimization state and bucket observations per rung
    self._get_rungs_state()

    # filter/reset old SH brackets
    self.clear_old_brackets()

    # identifying promotion list per rung
    self._handle_promotions()

    # fit any model/surrogates
    self._fit_models()

    return

AsynchronousSuccessiveHalvingWithPriors #

AsynchronousSuccessiveHalvingWithPriors(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = FixedPriorPolicy,
    promotion_policy: Any = AsyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: AsynchronousSuccessiveHalving

Implements ASHA with a sampling and asynchronous promotion policy.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = FixedPriorPolicy,
    promotion_policy: typing.Any = AsyncPromotionPolicy,  # key difference from SH
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        early_stopping_rate=early_stopping_rate,
        initial_design_type=initial_design_type,
        use_priors=self.use_priors,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/successive_halving.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    rung_to_promote = self.is_promotable()
    if rung_to_promote is not None:
        # promotes the first recorded promotable config in the argsort-ed rung
        row = self.observed_configs.iloc[self.rung_promotions[rung_to_promote][0]]
        config = deepcopy(row["config"])
        rung = rung_to_promote + 1
        # assigning the fidelity to evaluate the config at
        config.fidelity.value = self.rung_map[rung]
        # updating config IDs
        previous_config_id = f"{row.name}_{rung_to_promote}"
        config_id = f"{row.name}_{rung}"
    else:
        rung_id = self.min_rung
        # using random instead of np.random to be consistent with NePS BO
        if (
            self.use_priors
            and self.sample_default_first
            and len(self.observed_configs) == 0
        ):
            if self.sample_default_at_target:
                # sets the default config to be evaluated at the target fidelity
                rung_id = self.max_rung
                self.logger.info(
                    "Next config will be evaluated at target fidelity."
                )
            self.logger.info("Sampling the default configuration...")
            config = self.pipeline_space.sample_default_configuration()

        elif random.random() < self.random_interleave_prob:
            config = self.pipeline_space.sample(
                patience=self.patience,
                user_priors=False,  # sample uniformly random
                ignore_fidelity=True,
            )
        else:
            config = self.sample_new_config(rung=rung_id)

        fidelity_value = self.rung_map[rung_id]
        config.fidelity.value = fidelity_value

        previous_config_id = None
        config_id = f"{self._generate_new_config_id()}_{rung_id}"

    return config.hp_values(), config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

load_results #

load_results(
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None

This is basically the fit method.

PARAMETER DESCRIPTION
previous_results

[description]

TYPE: dict[str, ConfigResult]

pending_evaluations

[description]

TYPE: dict[str, ConfigResult]

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def load_results(
    self,
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None:
    """This is basically the fit method.

    Args:
        previous_results (dict[str, ConfigResult]): [description]
        pending_evaluations (dict[str, ConfigResult]): [description]
    """

    self.rung_histories = {
        rung: {"config": [], "perf": []}
        for rung in range(self.min_rung, self.max_rung + 1)
    }

    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))

    # previous optimization run exists and needs to be loaded
    self._load_previous_observations(previous_results)
    self.total_fevals = len(previous_results) + len(pending_evaluations)

    # account for pending evaluations
    self._handle_pending_evaluations(pending_evaluations)

    # process optimization state and bucket observations per rung
    self._get_rungs_state()

    # filter/reset old SH brackets
    self.clear_old_brackets()

    # identifying promotion list per rung
    self._handle_promotions()

    # fit any model/surrogates
    self._fit_models()

    return

SuccessiveHalving #

SuccessiveHalving(
    pipeline_space: SearchSpace,
    budget: int = None,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    use_priors: bool = False,
    sampling_policy: Any = RandomUniformPolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: SuccessiveHalvingBase

PARAMETER DESCRIPTION
pipeline_space

Space in which to search

TYPE: SearchSpace

budget

Maximum budget

TYPE: int DEFAULT: None

eta

The reduction factor used by SH

TYPE: int DEFAULT: 3

early_stopping_rate

Determines the number of rungs in an SH bracket Choosing 0 creates maximal rungs given the fidelity bounds

TYPE: int DEFAULT: 0

initial_design_type

Type of initial design to switch to BO Legacy parameter from NePS BO design. Could be used to extend to MF-BO.

TYPE: Literal['max_budget', 'unique_configs'] DEFAULT: 'max_budget'

use_priors

Allows random samples to be generated from a default Samples generated from a Gaussian centered around the default value

TYPE: bool DEFAULT: False

sampling_policy

The type of sampling procedure to use

TYPE: Any DEFAULT: RandomUniformPolicy

promotion_policy

The type of promotion procedure to use

TYPE: Any DEFAULT: SyncPromotionPolicy

loss_value_on_error

Setting this and cost_value_on_error to any float will supress any error during bayesian optimization and will use given loss value instead. default: None

TYPE: None | float DEFAULT: None

cost_value_on_error

Setting this and loss_value_on_error to any float will supress any error during bayesian optimization and will use given cost value instead. default: None

TYPE: None | float DEFAULT: None

logger

logger object, or None to use the neps logger

DEFAULT: None

prior_confidence

The range of confidence to have on the prior The higher the confidence, the smaller is the standard deviation of the prior distribution centered around the default

TYPE: Literal['low', 'medium', 'high'] DEFAULT: None

random_interleave_prob

Chooses the fraction of samples from random vs prior

TYPE: float DEFAULT: 0.0

sample_default_first

Whether to sample the default configuration first

TYPE: bool DEFAULT: False

sample_default_at_target

Whether to evaluate the default configuration at the target fidelity or max budget

TYPE: bool DEFAULT: False

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int = None,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    use_priors: bool = False,
    sampling_policy: typing.Any = RandomUniformPolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    """Initialise an SH bracket.

    Args:
        pipeline_space: Space in which to search
        budget: Maximum budget
        eta: The reduction factor used by SH
        early_stopping_rate: Determines the number of rungs in an SH bracket
            Choosing 0 creates maximal rungs given the fidelity bounds
        initial_design_type: Type of initial design to switch to BO
            Legacy parameter from NePS BO design. Could be used to extend to MF-BO.
        use_priors: Allows random samples to be generated from a default
            Samples generated from a Gaussian centered around the default value
        sampling_policy: The type of sampling procedure to use
        promotion_policy: The type of promotion procedure to use
        loss_value_on_error: Setting this and cost_value_on_error to any float will
            supress any error during bayesian optimization and will use given loss
            value instead. default: None
        cost_value_on_error: Setting this and loss_value_on_error to any float will
            supress any error during bayesian optimization and will use given cost
            value instead. default: None
        logger: logger object, or None to use the neps logger
        prior_confidence: The range of confidence to have on the prior
            The higher the confidence, the smaller is the standard deviation of the
            prior distribution centered around the default
        random_interleave_prob: Chooses the fraction of samples from random vs prior
        sample_default_first: Whether to sample the default configuration first
        sample_default_at_target: Whether to evaluate the default configuration at
            the target fidelity or max budget
    """
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
    )
    if random_interleave_prob < 0 or random_interleave_prob > 1:
        raise ValueError("random_interleave_prob should be in [0.0, 1.0]")
    self.random_interleave_prob = random_interleave_prob
    self.sample_default_first = sample_default_first
    self.sample_default_at_target = sample_default_at_target

    self.min_budget = self.pipeline_space.fidelity.lower
    self.max_budget = self.pipeline_space.fidelity.upper
    self.eta = eta
    # SH implicitly sets early_stopping_rate to 0
    # the parameter is exposed to allow HB to call SH with different stopping rates
    self.early_stopping_rate = early_stopping_rate
    self.sampling_policy = sampling_policy(
        pipeline_space=self.pipeline_space, logger=self.logger
    )
    self.promotion_policy = promotion_policy(self.eta)

    # `max_budget_init` checks for the number of configurations that have been
    # evaluated at the target budget
    self.initial_design_type = initial_design_type
    self.use_priors = use_priors

    # check to ensure no rung ID is negative
    # equivalent to s_max in https://arxiv.org/pdf/1603.06560.pdf
    self.stopping_rate_limit = np.floor(
        np.log(self.max_budget / self.min_budget) / np.log(self.eta)
    ).astype(int)
    assert self.early_stopping_rate <= self.stopping_rate_limit

    # maps rungs to a fidelity value for an SH bracket with `early_stopping_rate`
    self.rung_map = self._get_rung_map(self.early_stopping_rate)
    self.config_map = self._get_config_map(self.early_stopping_rate)

    self.min_rung = min(list(self.rung_map.keys()))
    self.max_rung = max(list(self.rung_map.keys()))

    # placeholder args for varying promotion and sampling policies
    self.promotion_policy_kwargs: dict = {}
    self.promotion_policy_kwargs.update({"config_map": self.config_map})
    self.sampling_args: dict = {}

    self.fidelities = list(self.rung_map.values())
    # stores the observations made and the corresponding fidelity explored
    # crucial data structure used for determining promotion candidates
    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))
    # stores which configs occupy each rung at any time
    self.rung_members: dict = dict()  # stores config IDs per rung
    self.rung_members_performance: dict = dict()  # performances recorded per rung
    self.rung_promotions: dict = dict()  # records a promotable config per rung
    self.total_fevals = 0

    # setup SH state counter
    self._counter = 0
    self.full_rung_trace = SuccessiveHalving._get_rung_trace(
        self.rung_map, self.config_map
    )

    #############################
    # Setting prior confidences #
    #############################
    # the std. dev or peakiness of distribution
    self.prior_confidence = prior_confidence
    self._enhance_priors()
    self.rung_histories = None

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. The key to simulating reset of rungs like in vanilla SH is by subsetting only the relevant part of the observation history that corresponds to one SH bracket. Under a parallel run, multiple SH brackets can be spawned. The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket is spawned. There are no waiting or blocking calls.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    The key to simulating reset of rungs like in vanilla SH is by subsetting only the
    relevant part of the observation history that corresponds to one SH bracket.
    Under a parallel run, multiple SH brackets can be spawned. The oldest, active,
    incomplete SH bracket is searched for to choose the next evaluation. If either
    all brackets are over or waiting, a new SH bracket is spawned.
    There are no waiting or blocking calls.
    """
    # indexes to mark separate brackets
    start = 0
    end = self.config_map[self.min_rung]  # length of lowest rung in a bracket
    if self.sample_default_at_target and self.sample_default_first:
        start += 1
        end += 1
    # iterates over the different SH brackets which span start-end by index
    while end <= len(self.observed_configs):
        # for the SH bracket in start-end, calculate total SH budget used
        bracket_budget_used = self._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than a SH bracket budget then still an active bracket
        if bracket_budget_used < sum(self.full_rung_trace):
            # subsetting only this SH bracket from the history
            self._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, and signals to starts a new SH bracket
            self._handle_promotions()
            promotion_count = 0
            for _, promotions in self.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found
                return
        # else move to next SH bracket recorded by an offset (= lowest rung length)
        start = end
        end = start + self.config_map[self.min_rung]

    # updates rung info with the latest active, incomplete bracket
    self._get_rungs_state(self.observed_configs.iloc[start:end])
    # _handle_promotion() need not be called as it is called by load_results()
    return

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/successive_halving.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    rung_to_promote = self.is_promotable()
    if rung_to_promote is not None:
        # promotes the first recorded promotable config in the argsort-ed rung
        row = self.observed_configs.iloc[self.rung_promotions[rung_to_promote][0]]
        config = deepcopy(row["config"])
        rung = rung_to_promote + 1
        # assigning the fidelity to evaluate the config at
        config.fidelity.value = self.rung_map[rung]
        # updating config IDs
        previous_config_id = f"{row.name}_{rung_to_promote}"
        config_id = f"{row.name}_{rung}"
    else:
        rung_id = self.min_rung
        # using random instead of np.random to be consistent with NePS BO
        if (
            self.use_priors
            and self.sample_default_first
            and len(self.observed_configs) == 0
        ):
            if self.sample_default_at_target:
                # sets the default config to be evaluated at the target fidelity
                rung_id = self.max_rung
                self.logger.info(
                    "Next config will be evaluated at target fidelity."
                )
            self.logger.info("Sampling the default configuration...")
            config = self.pipeline_space.sample_default_configuration()

        elif random.random() < self.random_interleave_prob:
            config = self.pipeline_space.sample(
                patience=self.patience,
                user_priors=False,  # sample uniformly random
                ignore_fidelity=True,
            )
        else:
            config = self.sample_new_config(rung=rung_id)

        fidelity_value = self.rung_map[rung_id]
        config.fidelity.value = fidelity_value

        previous_config_id = None
        config_id = f"{self._generate_new_config_id()}_{rung_id}"

    return config.hp_values(), config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

load_results #

load_results(
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None

This is basically the fit method.

PARAMETER DESCRIPTION
previous_results

[description]

TYPE: dict[str, ConfigResult]

pending_evaluations

[description]

TYPE: dict[str, ConfigResult]

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def load_results(
    self,
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None:
    """This is basically the fit method.

    Args:
        previous_results (dict[str, ConfigResult]): [description]
        pending_evaluations (dict[str, ConfigResult]): [description]
    """

    self.rung_histories = {
        rung: {"config": [], "perf": []}
        for rung in range(self.min_rung, self.max_rung + 1)
    }

    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))

    # previous optimization run exists and needs to be loaded
    self._load_previous_observations(previous_results)
    self.total_fevals = len(previous_results) + len(pending_evaluations)

    # account for pending evaluations
    self._handle_pending_evaluations(pending_evaluations)

    # process optimization state and bucket observations per rung
    self._get_rungs_state()

    # filter/reset old SH brackets
    self.clear_old_brackets()

    # identifying promotion list per rung
    self._handle_promotions()

    # fit any model/surrogates
    self._fit_models()

    return

SuccessiveHalvingBase #

SuccessiveHalvingBase(
    pipeline_space: SearchSpace,
    budget: int = None,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    use_priors: bool = False,
    sampling_policy: Any = RandomUniformPolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: BaseOptimizer

Implements a SuccessiveHalving procedure with a sampling and promotion policy.

PARAMETER DESCRIPTION
pipeline_space

Space in which to search

TYPE: SearchSpace

budget

Maximum budget

TYPE: int DEFAULT: None

eta

The reduction factor used by SH

TYPE: int DEFAULT: 3

early_stopping_rate

Determines the number of rungs in an SH bracket Choosing 0 creates maximal rungs given the fidelity bounds

TYPE: int DEFAULT: 0

initial_design_type

Type of initial design to switch to BO Legacy parameter from NePS BO design. Could be used to extend to MF-BO.

TYPE: Literal['max_budget', 'unique_configs'] DEFAULT: 'max_budget'

use_priors

Allows random samples to be generated from a default Samples generated from a Gaussian centered around the default value

TYPE: bool DEFAULT: False

sampling_policy

The type of sampling procedure to use

TYPE: Any DEFAULT: RandomUniformPolicy

promotion_policy

The type of promotion procedure to use

TYPE: Any DEFAULT: SyncPromotionPolicy

loss_value_on_error

Setting this and cost_value_on_error to any float will supress any error during bayesian optimization and will use given loss value instead. default: None

TYPE: None | float DEFAULT: None

cost_value_on_error

Setting this and loss_value_on_error to any float will supress any error during bayesian optimization and will use given cost value instead. default: None

TYPE: None | float DEFAULT: None

logger

logger object, or None to use the neps logger

DEFAULT: None

prior_confidence

The range of confidence to have on the prior The higher the confidence, the smaller is the standard deviation of the prior distribution centered around the default

TYPE: Literal['low', 'medium', 'high'] DEFAULT: None

random_interleave_prob

Chooses the fraction of samples from random vs prior

TYPE: float DEFAULT: 0.0

sample_default_first

Whether to sample the default configuration first

TYPE: bool DEFAULT: False

sample_default_at_target

Whether to evaluate the default configuration at the target fidelity or max budget

TYPE: bool DEFAULT: False

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int = None,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    use_priors: bool = False,
    sampling_policy: typing.Any = RandomUniformPolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = None,
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    """Initialise an SH bracket.

    Args:
        pipeline_space: Space in which to search
        budget: Maximum budget
        eta: The reduction factor used by SH
        early_stopping_rate: Determines the number of rungs in an SH bracket
            Choosing 0 creates maximal rungs given the fidelity bounds
        initial_design_type: Type of initial design to switch to BO
            Legacy parameter from NePS BO design. Could be used to extend to MF-BO.
        use_priors: Allows random samples to be generated from a default
            Samples generated from a Gaussian centered around the default value
        sampling_policy: The type of sampling procedure to use
        promotion_policy: The type of promotion procedure to use
        loss_value_on_error: Setting this and cost_value_on_error to any float will
            supress any error during bayesian optimization and will use given loss
            value instead. default: None
        cost_value_on_error: Setting this and loss_value_on_error to any float will
            supress any error during bayesian optimization and will use given cost
            value instead. default: None
        logger: logger object, or None to use the neps logger
        prior_confidence: The range of confidence to have on the prior
            The higher the confidence, the smaller is the standard deviation of the
            prior distribution centered around the default
        random_interleave_prob: Chooses the fraction of samples from random vs prior
        sample_default_first: Whether to sample the default configuration first
        sample_default_at_target: Whether to evaluate the default configuration at
            the target fidelity or max budget
    """
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
    )
    if random_interleave_prob < 0 or random_interleave_prob > 1:
        raise ValueError("random_interleave_prob should be in [0.0, 1.0]")
    self.random_interleave_prob = random_interleave_prob
    self.sample_default_first = sample_default_first
    self.sample_default_at_target = sample_default_at_target

    self.min_budget = self.pipeline_space.fidelity.lower
    self.max_budget = self.pipeline_space.fidelity.upper
    self.eta = eta
    # SH implicitly sets early_stopping_rate to 0
    # the parameter is exposed to allow HB to call SH with different stopping rates
    self.early_stopping_rate = early_stopping_rate
    self.sampling_policy = sampling_policy(
        pipeline_space=self.pipeline_space, logger=self.logger
    )
    self.promotion_policy = promotion_policy(self.eta)

    # `max_budget_init` checks for the number of configurations that have been
    # evaluated at the target budget
    self.initial_design_type = initial_design_type
    self.use_priors = use_priors

    # check to ensure no rung ID is negative
    # equivalent to s_max in https://arxiv.org/pdf/1603.06560.pdf
    self.stopping_rate_limit = np.floor(
        np.log(self.max_budget / self.min_budget) / np.log(self.eta)
    ).astype(int)
    assert self.early_stopping_rate <= self.stopping_rate_limit

    # maps rungs to a fidelity value for an SH bracket with `early_stopping_rate`
    self.rung_map = self._get_rung_map(self.early_stopping_rate)
    self.config_map = self._get_config_map(self.early_stopping_rate)

    self.min_rung = min(list(self.rung_map.keys()))
    self.max_rung = max(list(self.rung_map.keys()))

    # placeholder args for varying promotion and sampling policies
    self.promotion_policy_kwargs: dict = {}
    self.promotion_policy_kwargs.update({"config_map": self.config_map})
    self.sampling_args: dict = {}

    self.fidelities = list(self.rung_map.values())
    # stores the observations made and the corresponding fidelity explored
    # crucial data structure used for determining promotion candidates
    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))
    # stores which configs occupy each rung at any time
    self.rung_members: dict = dict()  # stores config IDs per rung
    self.rung_members_performance: dict = dict()  # performances recorded per rung
    self.rung_promotions: dict = dict()  # records a promotable config per rung
    self.total_fevals = 0

    # setup SH state counter
    self._counter = 0
    self.full_rung_trace = SuccessiveHalving._get_rung_trace(
        self.rung_map, self.config_map
    )

    #############################
    # Setting prior confidences #
    #############################
    # the std. dev or peakiness of distribution
    self.prior_confidence = prior_confidence
    self._enhance_priors()
    self.rung_histories = None

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/successive_halving.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    rung_to_promote = self.is_promotable()
    if rung_to_promote is not None:
        # promotes the first recorded promotable config in the argsort-ed rung
        row = self.observed_configs.iloc[self.rung_promotions[rung_to_promote][0]]
        config = deepcopy(row["config"])
        rung = rung_to_promote + 1
        # assigning the fidelity to evaluate the config at
        config.fidelity.value = self.rung_map[rung]
        # updating config IDs
        previous_config_id = f"{row.name}_{rung_to_promote}"
        config_id = f"{row.name}_{rung}"
    else:
        rung_id = self.min_rung
        # using random instead of np.random to be consistent with NePS BO
        if (
            self.use_priors
            and self.sample_default_first
            and len(self.observed_configs) == 0
        ):
            if self.sample_default_at_target:
                # sets the default config to be evaluated at the target fidelity
                rung_id = self.max_rung
                self.logger.info(
                    "Next config will be evaluated at target fidelity."
                )
            self.logger.info("Sampling the default configuration...")
            config = self.pipeline_space.sample_default_configuration()

        elif random.random() < self.random_interleave_prob:
            config = self.pipeline_space.sample(
                patience=self.patience,
                user_priors=False,  # sample uniformly random
                ignore_fidelity=True,
            )
        else:
            config = self.sample_new_config(rung=rung_id)

        fidelity_value = self.rung_map[rung_id]
        config.fidelity.value = fidelity_value

        previous_config_id = None
        config_id = f"{self._generate_new_config_id()}_{rung_id}"

    return config.hp_values(), config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

load_results #

load_results(
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None

This is basically the fit method.

PARAMETER DESCRIPTION
previous_results

[description]

TYPE: dict[str, ConfigResult]

pending_evaluations

[description]

TYPE: dict[str, ConfigResult]

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def load_results(
    self,
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None:
    """This is basically the fit method.

    Args:
        previous_results (dict[str, ConfigResult]): [description]
        pending_evaluations (dict[str, ConfigResult]): [description]
    """

    self.rung_histories = {
        rung: {"config": [], "perf": []}
        for rung in range(self.min_rung, self.max_rung + 1)
    }

    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))

    # previous optimization run exists and needs to be loaded
    self._load_previous_observations(previous_results)
    self.total_fevals = len(previous_results) + len(pending_evaluations)

    # account for pending evaluations
    self._handle_pending_evaluations(pending_evaluations)

    # process optimization state and bucket observations per rung
    self._get_rungs_state()

    # filter/reset old SH brackets
    self.clear_old_brackets()

    # identifying promotion list per rung
    self._handle_promotions()

    # fit any model/surrogates
    self._fit_models()

    return

SuccessiveHalvingWithPriors #

SuccessiveHalvingWithPriors(
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal[
        "max_budget", "unique_configs"
    ] = "max_budget",
    sampling_policy: Any = FixedPriorPolicy,
    promotion_policy: Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
)

Bases: SuccessiveHalving

Implements a SuccessiveHalving procedure with a sampling and promotion policy.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    budget: int,
    eta: int = 3,
    early_stopping_rate: int = 0,
    initial_design_type: Literal["max_budget", "unique_configs"] = "max_budget",
    sampling_policy: typing.Any = FixedPriorPolicy,
    promotion_policy: typing.Any = SyncPromotionPolicy,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
    ignore_errors: bool = False,
    logger=None,
    prior_confidence: Literal["low", "medium", "high"] = "medium",  # medium = 0.25
    random_interleave_prob: float = 0.0,
    sample_default_first: bool = False,
    sample_default_at_target: bool = False,
):
    super().__init__(
        pipeline_space=pipeline_space,
        budget=budget,
        eta=eta,
        early_stopping_rate=early_stopping_rate,
        initial_design_type=initial_design_type,
        use_priors=self.use_priors,
        sampling_policy=sampling_policy,
        promotion_policy=promotion_policy,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
        ignore_errors=ignore_errors,
        logger=logger,
        prior_confidence=prior_confidence,
        random_interleave_prob=random_interleave_prob,
        sample_default_first=sample_default_first,
        sample_default_at_target=sample_default_at_target,
    )

clear_old_brackets #

clear_old_brackets()

Enforces reset at each new bracket.

The _get_rungs_state() function creates the rung_promotions dict mapping which is used by the promotion policies to determine the next step: promotion/sample. The key to simulating reset of rungs like in vanilla SH is by subsetting only the relevant part of the observation history that corresponds to one SH bracket. Under a parallel run, multiple SH brackets can be spawned. The oldest, active, incomplete SH bracket is searched for to choose the next evaluation. If either all brackets are over or waiting, a new SH bracket is spawned. There are no waiting or blocking calls.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def clear_old_brackets(self):
    """Enforces reset at each new bracket.

    The _get_rungs_state() function creates the `rung_promotions` dict mapping which
    is used by the promotion policies to determine the next step: promotion/sample.
    The key to simulating reset of rungs like in vanilla SH is by subsetting only the
    relevant part of the observation history that corresponds to one SH bracket.
    Under a parallel run, multiple SH brackets can be spawned. The oldest, active,
    incomplete SH bracket is searched for to choose the next evaluation. If either
    all brackets are over or waiting, a new SH bracket is spawned.
    There are no waiting or blocking calls.
    """
    # indexes to mark separate brackets
    start = 0
    end = self.config_map[self.min_rung]  # length of lowest rung in a bracket
    if self.sample_default_at_target and self.sample_default_first:
        start += 1
        end += 1
    # iterates over the different SH brackets which span start-end by index
    while end <= len(self.observed_configs):
        # for the SH bracket in start-end, calculate total SH budget used
        bracket_budget_used = self._calc_budget_used_in_bracket(
            deepcopy(self.observed_configs.rung.values[start:end])
        )
        # if budget used is less than a SH bracket budget then still an active bracket
        if bracket_budget_used < sum(self.full_rung_trace):
            # subsetting only this SH bracket from the history
            self._get_rungs_state(self.observed_configs.iloc[start:end])
            # extra call to use the updated rung member info to find promotions
            # SyncPromotion signals a wait if a rung is full but with
            # incomplete/pending evaluations, and signals to starts a new SH bracket
            self._handle_promotions()
            promotion_count = 0
            for _, promotions in self.rung_promotions.items():
                promotion_count += len(promotions)
            # if no promotion candidates are returned, then the current bracket
            # is active and waiting
            if promotion_count:
                # returns the oldest active bracket if a promotion found
                return
        # else move to next SH bracket recorded by an offset (= lowest rung length)
        start = end
        end = start + self.config_map[self.min_rung]

    # updates rung info with the latest active, incomplete bracket
    self._get_rungs_state(self.observed_configs.iloc[start:end])
    # _handle_promotion() need not be called as it is called by load_results()
    return

get_config_and_ids #

get_config_and_ids() -> tuple[SearchSpace, str, str | None]

...and this is the method that decides which point to query.

RETURNS DESCRIPTION
tuple[SearchSpace, str, str | None]
Source code in neps/optimizers/multi_fidelity/successive_halving.py
def get_config_and_ids(
    self,
) -> tuple[SearchSpace, str, str | None]:
    """...and this is the method that decides which point to query.

    Returns:
        [type]: [description]
    """
    rung_to_promote = self.is_promotable()
    if rung_to_promote is not None:
        # promotes the first recorded promotable config in the argsort-ed rung
        row = self.observed_configs.iloc[self.rung_promotions[rung_to_promote][0]]
        config = deepcopy(row["config"])
        rung = rung_to_promote + 1
        # assigning the fidelity to evaluate the config at
        config.fidelity.value = self.rung_map[rung]
        # updating config IDs
        previous_config_id = f"{row.name}_{rung_to_promote}"
        config_id = f"{row.name}_{rung}"
    else:
        rung_id = self.min_rung
        # using random instead of np.random to be consistent with NePS BO
        if (
            self.use_priors
            and self.sample_default_first
            and len(self.observed_configs) == 0
        ):
            if self.sample_default_at_target:
                # sets the default config to be evaluated at the target fidelity
                rung_id = self.max_rung
                self.logger.info(
                    "Next config will be evaluated at target fidelity."
                )
            self.logger.info("Sampling the default configuration...")
            config = self.pipeline_space.sample_default_configuration()

        elif random.random() < self.random_interleave_prob:
            config = self.pipeline_space.sample(
                patience=self.patience,
                user_priors=False,  # sample uniformly random
                ignore_fidelity=True,
            )
        else:
            config = self.sample_new_config(rung=rung_id)

        fidelity_value = self.rung_map[rung_id]
        config.fidelity.value = fidelity_value

        previous_config_id = None
        config_id = f"{self._generate_new_config_id()}_{rung_id}"

    return config.hp_values(), config_id, previous_config_id  # type: ignore

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_promotable #

is_promotable() -> int | None

Returns an int if a rung can be promoted, else a None.

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def is_promotable(self) -> int | None:
    """Returns an int if a rung can be promoted, else a None."""
    rung_to_promote = None

    # # iterates starting from the highest fidelity promotable to the lowest fidelity
    for rung in reversed(range(self.min_rung, self.max_rung)):
        if len(self.rung_promotions[rung]) > 0:
            rung_to_promote = rung
            # stop checking when a promotable config found
            # no need to search at lower fidelities
            break
    return rung_to_promote

load_results #

load_results(
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None

This is basically the fit method.

PARAMETER DESCRIPTION
previous_results

[description]

TYPE: dict[str, ConfigResult]

pending_evaluations

[description]

TYPE: dict[str, ConfigResult]

Source code in neps/optimizers/multi_fidelity/successive_halving.py
def load_results(
    self,
    previous_results: dict[str, ConfigResult],
    pending_evaluations: dict[str, SearchSpace],
) -> None:
    """This is basically the fit method.

    Args:
        previous_results (dict[str, ConfigResult]): [description]
        pending_evaluations (dict[str, ConfigResult]): [description]
    """

    self.rung_histories = {
        rung: {"config": [], "perf": []}
        for rung in range(self.min_rung, self.max_rung + 1)
    }

    self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))

    # previous optimization run exists and needs to be loaded
    self._load_previous_observations(previous_results)
    self.total_fevals = len(previous_results) + len(pending_evaluations)

    # account for pending evaluations
    self._handle_pending_evaluations(pending_evaluations)

    # process optimization state and bucket observations per rung
    self._get_rungs_state()

    # filter/reset old SH brackets
    self.clear_old_brackets()

    # identifying promotion list per rung
    self._handle_promotions()

    # fit any model/surrogates
    self._fit_models()

    return