Skip to content

Mf tpe

neps.optimizers.bayesian_optimization.mf_tpe #

MultiFidelityPriorWeightedTreeParzenEstimator #

MultiFidelityPriorWeightedTreeParzenEstimator(
    pipeline_space: SearchSpace,
    use_priors: bool = True,
    prior_num_evals: float = 2.5,
    good_fraction: float = 0.3334,
    random_interleave_prob: float = 0.0,
    initial_design_size: int = 0,
    prior_as_samples: bool = True,
    pending_as_bad: bool = True,
    fidelity_weighting: Literal[
        "linear", "spearman"
    ] = "spearman",
    surrogate_model: str = "kde",
    good_model_bw_factor: int = 1.5,
    joint_kde_modelling: bool = False,
    threshold_improvement: bool = True,
    promote_from_acq: bool = True,
    acquisition_sampler: (
        str | AcquisitionSampler
    ) = "mutation",
    prior_draws: int = 1000,
    prior_confidence: Literal[
        "low", "medium", "high"
    ] = "medium",
    surrogate_model_args: dict = None,
    soft_promotion: bool = True,
    patience: int = 50,
    logger=None,
    budget: None | int | float = None,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
)

Bases: BaseOptimizer

PARAMETER DESCRIPTION
pipeline_space

Space in which to search

TYPE: SearchSpace

prior_num_evals

[description]. Defaults to 2.5.

TYPE: float DEFAULT: 2.5

good_fraction

[description]. Defaults to 0.333.

TYPE: float DEFAULT: 0.3334

random_interleave_prob

Frequency at which random configurations are sampled instead of configurations from the acquisition strategy.

TYPE: float DEFAULT: 0.0

initial_design_size

Number of 'x' samples that are to be evaluated before selecting a sample using a strategy instead of randomly. If there is a user prior, we can rely on the model from the very first iteration.

TYPE: int DEFAULT: 0

prior_as_samples

Whether to sample from the KDE and incorporate that way, or

TYPE: bool DEFAULT: True

pending_as_bad

Whether to treat pending observations as bad, assigning them to

TYPE: bool DEFAULT: True

prior_draws

The number of samples drawn from the prior if there is one. This

TYPE: int DEFAULT: 1000

patience

How many times we try something that fails before giving up.

TYPE: int DEFAULT: 50

budget

Maximum budget

TYPE: None | int | float DEFAULT: None

loss_value_on_error

Setting this and cost_value_on_error to any float will supress any error during bayesian optimization and will use given loss value instead. default: None

TYPE: None | float DEFAULT: None

cost_value_on_error

Setting this and loss_value_on_error to any float will supress any error during bayesian optimization and will use given cost value instead. default: None

TYPE: None | float DEFAULT: None

logger

logger object, or None to use the neps logger

DEFAULT: None

Source code in neps/optimizers/bayesian_optimization/mf_tpe.py
def __init__(
    self,
    pipeline_space: SearchSpace,
    use_priors: bool = True,
    prior_num_evals: float = 2.5,
    good_fraction: float = 0.3334,
    random_interleave_prob: float = 0.0,
    initial_design_size: int = 0,
    prior_as_samples: bool = True,
    pending_as_bad: bool = True,
    fidelity_weighting: Literal["linear", "spearman"] = "spearman",
    surrogate_model: str = "kde",
    good_model_bw_factor: int = 1.5,
    joint_kde_modelling: bool = False,
    threshold_improvement: bool = True,
    promote_from_acq: bool = True,
    acquisition_sampler: str | AcquisitionSampler = "mutation",
    prior_draws: int = 1000,
    prior_confidence: Literal["low", "medium", "high"] = "medium",
    surrogate_model_args: dict = None,
    soft_promotion: bool = True,
    patience: int = 50,
    logger=None,
    budget: None | int | float = None,
    loss_value_on_error: None | float = None,
    cost_value_on_error: None | float = None,
):
    """[summary]

    Args:
        pipeline_space: Space in which to search
        prior_num_evals (float, optional): [description]. Defaults to 2.5.
        good_fraction (float, optional): [description]. Defaults to 0.333.
        random_interleave_prob: Frequency at which random configurations are sampled
            instead of configurations from the acquisition strategy.
        initial_design_size: Number of 'x' samples that are to be evaluated before
            selecting a sample using a strategy instead of randomly. If there is a
            user prior, we can rely on the model from the very first iteration.
        prior_as_samples: Whether to sample from the KDE and incorporate that way, or
        just have the distribution be an linear combination of the KDE and the prior.
        Should be True if the prior happens to be unnormalized.
        pending_as_bad: Whether to treat pending observations as bad, assigning them to
        the bad KDE to encourage diversity among samples queried in parallel
        prior_draws: The number of samples drawn from the prior if there is one. This
        # does not affect the strength of the prior, just how accurately it
        # is reconstructed by the KDE.
        patience: How many times we try something that fails before giving up.
        budget: Maximum budget
        loss_value_on_error: Setting this and cost_value_on_error to any float will
            supress any error during bayesian optimization and will use given loss
            value instead. default: None
        cost_value_on_error: Setting this and loss_value_on_error to any float will
            supress any error during bayesian optimization and will use given cost
            value instead. default: None
        logger: logger object, or None to use the neps logger
    """
    super().__init__(
        pipeline_space=pipeline_space,
        patience=patience,
        logger=logger,
        budget=budget,
        loss_value_on_error=loss_value_on_error,
        cost_value_on_error=cost_value_on_error,
    )
    self.pipeline_space = pipeline_space
    self.good_fraction = good_fraction
    if self.pipeline_space.has_fidelity:
        self.min_fidelity = pipeline_space.fidelity.lower
        self.max_fidelity = pipeline_space.fidelity.upper
        self.rung_map, self.inverse_rung_map = self._get_rung_maps()
        self.min_rung = 0
        self.max_rung = len(self.rung_map) - 1

    else:
        self.min_rung = 0
        self.max_rung = 0
        self.min_fidelity = 1
        self.max_fidelity = 1
        self.rung_map, self.inverse_rung_map = self._get_rung_maps()

    if initial_design_size == 0:
        self._initial_design_size = len(self.pipeline_space) * np.round(
            1 / self.good_fraction
        ).astype(int)
    else:
        self._initial_design_size = initial_design_size
    self.promote_from_acq = promote_from_acq

    self.num_rungs = len(self.rung_map)
    self.use_priors = use_priors
    self.prior_num_evals = prior_num_evals
    self._random_interleave_prob = random_interleave_prob
    self._pending_as_bad = pending_as_bad
    self.prior_draws = prior_draws
    self._has_promotable_configs = False
    self.soft_promotion = soft_promotion
    self.joint_kde_modelling = joint_kde_modelling
    # if we use priors, we don't add conigurations as good until is is within the top fraction
    # This heuristic has not been tried further, but makes sense in the context when we have priors
    self.round_up = not use_priors
    self.fidelity_weighting = fidelity_weighting
    self.threshold_improvement = threshold_improvement
    # TODO have this read in as part of load_results - it cannot be saved as an attribute when
    # running parallel instances of the algorithm (since the old configs are shared, not instance-specific)
    self.old_configs_per_fid = [[] for i in range(self.num_rungs)]
    # We assume that the information conveyed per fidelity (and the cost) is linear in the
    # fidelity levels if nothing else is specified
    if surrogate_model != "kde":
        raise NotImplementedError(
            "Only supports KDEs for now. Could (maybe?) support binary classification in the future."
        )
    self.acquisition_sampler = instance_from_map(
        AcquisitionSamplerMapping,
        acquisition_sampler,
        name="acquisition sampler function",
        kwargs={"patience": self.patience, "pipeline_space": self.pipeline_space},
    )
    self.prior_confidence = prior_confidence
    self._enhance_priors()
    surrogate_model_args = surrogate_model_args or {}

    param_types, num_options, logged_params, is_fidelity = self._get_types()
    surrogate_model_args["param_types"] = param_types
    surrogate_model_args["num_options"] = num_options
    surrogate_model_args["is_fidelity"] = is_fidelity
    surrogate_model_args["logged_params"] = logged_params
    good_model_args = deepcopy(surrogate_model_args)
    good_model_args["bandwidth_factor"] = good_model_bw_factor
    if self.pipeline_space.has_prior and use_priors:
        if prior_as_samples:
            self.prior_samples = [
                self.pipeline_space.sample(
                    patience=self.patience, user_priors=True, ignore_fidelity=False
                )
                for idx in range(self.prior_draws)
            ]
        else:
            pass
            # TODO work out affine combination
    else:
        self.prior_samples = []

    self.surrogate_models = {
        "good": instance_from_map(
            SurrogateModelMapping,
            surrogate_model,
            name="surrogate model",
            kwargs=good_model_args,
        ),
        "bad": instance_from_map(
            SurrogateModelMapping,
            surrogate_model,
            name="surrogate model",
            kwargs=surrogate_model_args,
        ),
        "all": instance_from_map(
            SurrogateModelMapping,
            surrogate_model,
            name="surrogate model",
            kwargs=surrogate_model_args,
        ),
    }
    self.acquisition = self
    self.acquisition_sampler = instance_from_map(
        AcquisitionSamplerMapping,
        acquisition_sampler,
        name="acquisition sampler function",
        kwargs={"patience": self.patience, "pipeline_space": self.pipeline_space},
    )

__call__ #

__call__(
    x: Iterable,
    asscalar: bool = False,
    only_lowest_fidelity=True,
    only_good=False,
) -> ndarray | Tensor | float

Return the negative expected improvement at the query point

Source code in neps/optimizers/bayesian_optimization/mf_tpe.py
def __call__(
    self,
    x: Iterable,
    asscalar: bool = False,
    only_lowest_fidelity=True,
    only_good=False,
) -> np.ndarray | torch.Tensor | float:
    """
    Return the negative expected improvement at the query point
    """
    # this is to only make the lowest fidelity viable
    # TODO have this as a setting in the acq_sampler instead
    if only_lowest_fidelity:
        is_lowest_fidelity = (
            np.array([x_.fidelity.value for x_ in x])
            == self.rung_map[self.min_rung]
        )
        return np.log(self.surrogate_models["good"].pdf(x)) - np.log(
            self.surrogate_models["bad"].pdf(x)
        )
    else:
        return np.log(self.surrogate_models["good"].pdf(x)) - np.log(
            self.surrogate_models["bad"].pdf(x)
        )

get_cost #

get_cost(result: str | dict | float) -> float | Any

Calls result.utils.get_cost() and passes the error handling through. Please use self.get_cost() instead of get_cost() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_cost(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_cost() and passes the error handling through.
    Please use self.get_cost() instead of get_cost() in all optimizer classes."""
    return _get_cost(
        result,
        cost_value_on_error=self.cost_value_on_error,
        ignore_errors=self.ignore_errors,
    )

get_learning_curve #

get_learning_curve(
    result: str | dict | float,
) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_learning_curve(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_learning_curve(
        result,
        learning_curve_on_error=self.learning_curve_on_error,
        ignore_errors=self.ignore_errors,
    )

get_loss #

get_loss(result: str | dict | float) -> float | Any

Calls result.utils.get_loss() and passes the error handling through. Please use self.get_loss() instead of get_loss() in all optimizer classes.

Source code in neps/optimizers/base_optimizer.py
def get_loss(self, result: str | dict | float) -> float | Any:
    """Calls result.utils.get_loss() and passes the error handling through.
    Please use self.get_loss() instead of get_loss() in all optimizer classes."""
    return _get_loss(
        result,
        loss_value_on_error=self.loss_value_on_error,
        ignore_errors=self.ignore_errors,
    )

is_init_phase #

is_init_phase() -> bool

Decides if optimization is still under the warmstart phase/model-based search.

Source code in neps/optimizers/bayesian_optimization/mf_tpe.py
def is_init_phase(self) -> bool:
    """Decides if optimization is still under the warmstart phase/model-based search."""
    if self._num_train_x >= self._initial_design_size:
        return False
    return True