Skip to content

Mcmc gaussian process

smac.model.gaussian_process.mcmc_gaussian_process #

MCMCGaussianProcess #

MCMCGaussianProcess(
    configspace: ConfigurationSpace,
    kernel: Kernel,
    n_mcmc_walkers: int = 20,
    chain_length: int = 50,
    burning_steps: int = 50,
    mcmc_sampler: str = "emcee",
    average_samples: bool = False,
    normalize_y: bool = True,
    instance_features: (
        dict[str, list[int | float]] | None
    ) = None,
    pca_components: int | None = 7,
    seed: int = 0,
)

Bases: AbstractGaussianProcess

Implementation of a Gaussian process model which out-integrates its hyperparameters by Markow-Chain-Monte-Carlo (MCMC). If you use this class make sure that you also use an integrated acquisition function to integrate over the GP's hyperparameter as proposed by Snoek et al.

This code is based on the implementation of RoBO:

Klein, A. and Falkner, S. and Mansur, N. and Hutter, F. RoBO: A Flexible and Robust Bayesian Optimization Framework in Python In: NIPS 2017 Bayesian Optimization Workshop

Parameters#

configspace : ConfigurationSpace kernel : Kernel Kernel which is used for the Gaussian process. n_mcmc_walkers : int, defaults to 20 The number of hyperparameter samples. This also determines the number of walker for MCMC sampling as each walker will return one hyperparameter sample. chain_length : int, defaults to 50 The length of the MCMC chain. We start n_mcmc_walkers walker for chain_length steps, and we use the last sample in the chain as a hyperparameter sample. burning_steps : int, defaults to 50 The number of burning steps before the actual MCMC sampling starts. mcmc_sampler : str, defaults to "emcee" Choose a self-tuning MCMC sampler. Can be either emcee or nuts. normalize_y : bool, defaults to True Zero mean unit variance normalization of the output values. instance_features : dict[str, list[int | float]] | None, defaults to None Features (list of int or floats) of the instances (str). The features are incorporated into the X data, on which the model is trained on. pca_components : float, defaults to 7 Number of components to keep when using PCA to reduce dimensionality of instance features. seed : int

Source code in smac/model/gaussian_process/mcmc_gaussian_process.py
def __init__(
    self,
    configspace: ConfigurationSpace,
    kernel: Kernel,
    n_mcmc_walkers: int = 20,
    chain_length: int = 50,
    burning_steps: int = 50,
    mcmc_sampler: str = "emcee",
    average_samples: bool = False,
    normalize_y: bool = True,
    instance_features: dict[str, list[int | float]] | None = None,
    pca_components: int | None = 7,
    seed: int = 0,
):
    if mcmc_sampler not in ["emcee", "nuts"]:
        raise ValueError(f"MCMC Gaussian process does not support the sampler `{mcmc_sampler}`.")

    super().__init__(
        configspace=configspace,
        kernel=kernel,
        instance_features=instance_features,
        pca_components=pca_components,
        seed=seed,
    )

    self._n_mcmc_walkers = n_mcmc_walkers
    self._chain_length = chain_length
    self._burning_steps = burning_steps
    self._models: list[GaussianProcess] = []
    self._normalize_y = normalize_y
    self._mcmc_sampler = mcmc_sampler
    self._average_samples = average_samples
    self._set_has_conditions()

    # Internal statistics
    self._n_ll_evals = 0
    self._burned = False
    self._is_trained = False
    self._samples: np.ndarray | None = None

models property #

Returns the internally used gaussian processes.

predict #

predict(
    X: ndarray, covariance_type: str | None = "diagonal"
) -> tuple[ndarray, ndarray | None]

Predicts mean and variance for a given X. Internally, calls the method _predict.

Parameters#

X : np.ndarray [#samples, #hyperparameters + #features] Input data points. covariance_type: str | None, defaults to "diagonal" Specifies what to return along with the mean. Applied only to Gaussian Processes. Takes four valid inputs: * None: Only the mean is returned. * "std": Standard deviation at test points is returned. * "diagonal": Diagonal of the covariance matrix is returned. * "full": Whole covariance matrix between the test points is returned.

Returns#

means : np.ndarray [#samples, #objectives] The predictive mean. vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None Predictive variance or standard deviation.

Source code in smac/model/abstract_model.py
def predict(
    self,
    X: np.ndarray,
    covariance_type: str | None = "diagonal",
) -> tuple[np.ndarray, np.ndarray | None]:
    """Predicts mean and variance for a given X. Internally, calls the method `_predict`.

    Parameters
    ----------
    X : np.ndarray [#samples, #hyperparameters + #features]
        Input data points.
    covariance_type: str | None, defaults to "diagonal"
        Specifies what to return along with the mean. Applied only to Gaussian Processes.
        Takes four valid inputs:
        * None: Only the mean is returned.
        * "std": Standard deviation at test points is returned.
        * "diagonal": Diagonal of the covariance matrix is returned.
        * "full": Whole covariance matrix between the test points is returned.

    Returns
    -------
    means : np.ndarray [#samples, #objectives]
        The predictive mean.
    vars : np.ndarray [#samples, #objectives] or [#samples, #samples] | None
        Predictive variance or standard deviation.
    """
    if len(X.shape) != 2:
        raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))

    if X.shape[1] != self._n_hps + self._n_features:
        raise ValueError(
            f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
            f"but has {X.shape[1]} in total."
        )

    if self._apply_pca:
        try:
            X_feats = X[:, -self._n_features :]
            X_feats = self._scaler.transform(X_feats)
            X_feats = self._pca.transform(X_feats)
            X = np.hstack((X[:, : self._n_hps], X_feats))
        except NotFittedError:
            # PCA not fitted if only one training sample
            pass

    if X.shape[1] != len(self._types):
        raise ValueError("Rows in X should have %d entries but have %d!" % (len(self._types), X.shape[1]))

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", "Predicted variances smaller than 0. Setting those variances to 0.")
        mean, var = self._predict(X, covariance_type)

    if len(mean.shape) == 1:
        mean = mean.reshape((-1, 1))

    if var is not None and len(var.shape) == 1:
        var = var.reshape((-1, 1))

    return mean, var

predict_marginalized #

predict_marginalized(X: ndarray) -> tuple[ndarray, ndarray]

Predicts mean and variance marginalized over all instances.

Warning#

The input data must not include any features.

Parameters#

X : np.ndarray [#samples, #hyperparameters] Input data points.

Returns#

means : np.ndarray [#samples, 1] The predictive mean. vars : np.ndarray [#samples, 1] The predictive variance.

Source code in smac/model/abstract_model.py
def predict_marginalized(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """Predicts mean and variance marginalized over all instances.

    Warning
    -------
    The input data must not include any features.

    Parameters
    ----------
    X : np.ndarray [#samples, #hyperparameters]
        Input data points.

    Returns
    -------
    means : np.ndarray [#samples, 1]
        The predictive mean.
    vars : np.ndarray [#samples, 1]
        The predictive variance.
    """
    if len(X.shape) != 2:
        raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))

    if X.shape[1] != self._n_hps:
        raise ValueError(
            f"Feature mismatch: X should have {self._n_hps} hyperparameters (and no features) for this method, "
            f"but has {X.shape[1]} in total."
        )

    if self._instance_features is None:
        mean, var = self.predict(X)
        assert var is not None

        var[var < self._var_threshold] = self._var_threshold
        var[np.isnan(var)] = self._var_threshold

        return mean, var
    else:
        n_instances = len(self._instance_features)

        mean = np.zeros(X.shape[0])
        var = np.zeros(X.shape[0])
        for i, x in enumerate(X):
            features = np.array(list(self._instance_features.values()))
            x_tiled = np.tile(x, (n_instances, 1))
            X_ = np.hstack((x_tiled, features))

            means, vars = self.predict(X_)
            assert vars is not None

            # VAR[1/n (X_1 + ... + X_n)] =
            # 1/n^2 * ( VAR(X_1) + ... + VAR(X_n))
            # for independent X_1 ... X_n
            var_x = np.sum(vars) / (len(vars) ** 2)
            if var_x < self._var_threshold:
                var_x = self._var_threshold

            var[i] = var_x
            mean[i] = np.mean(means)

        if len(mean.shape) == 1:
            mean = mean.reshape((-1, 1))

        if len(var.shape) == 1:
            var = var.reshape((-1, 1))

        return mean, var

train #

train(X: ndarray, Y: ndarray) -> Self

Trains the random forest on X and Y. Internally, calls the method _train.

Parameters#

X : np.ndarray [#samples, #hyperparameters + #features] Input data points. Y : np.ndarray [#samples, #objectives] The corresponding target values.

Returns#

self : AbstractModel

Source code in smac/model/abstract_model.py
def train(self: Self, X: np.ndarray, Y: np.ndarray) -> Self:
    """Trains the random forest on X and Y. Internally, calls the method `_train`.

    Parameters
    ----------
    X : np.ndarray [#samples, #hyperparameters + #features]
        Input data points.
    Y : np.ndarray [#samples, #objectives]
        The corresponding target values.

    Returns
    -------
    self : AbstractModel
    """
    if len(X.shape) != 2:
        raise ValueError("Expected 2d array, got %dd array!" % len(X.shape))

    if X.shape[1] != self._n_hps + self._n_features:
        raise ValueError(
            f"Feature mismatch: X should have {self._n_hps} hyperparameters + {self._n_features} features, "
            f"but has {X.shape[1]} in total."
        )

    if X.shape[0] != Y.shape[0]:
        raise ValueError("X.shape[0] ({}) != y.shape[0] ({})".format(X.shape[0], Y.shape[0]))

    # Reduce dimensionality of features if larger than PCA_DIM
    if (
        self._pca_components is not None
        and X.shape[0] > self._pca.n_components
        and self._n_features >= self._pca_components
    ):
        X_feats = X[:, -self._n_features :]

        # Scale features
        X_feats = self._scaler.fit_transform(X_feats)
        X_feats = np.nan_to_num(X_feats)  # if features with max == min

        # PCA
        X_feats = self._pca.fit_transform(X_feats)
        X = np.hstack((X[:, : self._n_hps], X_feats))

        if hasattr(self, "_types"):
            # For RF, adapt types list
            # if X_feats.shape[0] < self._pca, X_feats.shape[1] == X_feats.shape[0]
            self._types = np.array(
                np.hstack((self._types[: self._n_hps], np.zeros(X_feats.shape[1]))),
                dtype=np.uint,
            )  # type: ignore

        self._apply_pca = True
    else:
        self._apply_pca = False

        if hasattr(self, "_types"):
            self._types = copy.deepcopy(self._initial_types)

    return self._train(X, Y)