Source code for smac.optimizer.subspaces.turbo_subspace

from typing import Dict, List, Optional, Tuple, Type, Union

import math
import warnings

import numpy as np
from ConfigSpace.hyperparameters import NumericalHyperparameter
from ConfigSpace.util import deactivate_inactive_hyperparameters
from scipy.stats.qmc import LatinHypercube, Sobol

from smac.configspace import Configuration, ConfigurationSpace
from smac.epm.base_epm import BaseEPM
from smac.epm.gaussian_process import GaussianProcess
from smac.epm.gaussian_process.augmented import GloballyAugmentedLocalGaussianProcess
from smac.epm.gaussian_process.gpytorch import GPyTorchGaussianProcess
from smac.epm.gaussian_process.mcmc import MCMCGaussianProcess
from smac.optimizer.acquisition import TS, AbstractAcquisitionFunction
from smac.optimizer.subspaces import LocalSubspace

warnings.filterwarnings("ignore", message="The balance properties of Sobol' points require" " n to be a power of 2.")


[docs]class TuRBOSubSpace(LocalSubspace): """ Subspace designed for TurBO: D. Eriksson et al. Scalable Global Optimization via Local Bayesian Optimization https://proceedings.neurips.cc/paper/2019/hash/6c990b7aca7bc7058f5e98ea909e924b-Abstract.html The hyperparameters follow the illustration under supplementary D, `TuRBO details`. Parameters ---------- length_init: float initialized length of subspace length_min: float the minimal length of subspace, if the subspace has a length smaller than this value, turbo will restart length_max: float the maximal length of subspace success_tol: float the number of successive successful evaluations required for expanding the subregion failure_tol_min: float the minimal number of successive successful evaluations required for shrinking the subregion (otherwise this value is set as number of feature dimensions) n_init_x_params: int how many configurations will be used at most in the initial design (X*D). Used for restarting the subspace n_candidate_max: int The maximal Number of points used as candidates """ def __init__( self, config_space: ConfigurationSpace, bounds: List[Tuple[float, float]], hps_types: List[int], bounds_ss_cont: Optional[np.ndarray] = None, bounds_ss_cat: Optional[List[Tuple]] = None, model_local: Union[BaseEPM, Type[BaseEPM]] = GPyTorchGaussianProcess, model_local_kwargs: Dict = {}, acq_func_local: Union[AbstractAcquisitionFunction, Type[AbstractAcquisitionFunction]] = TS, acq_func_local_kwargs: Optional[Dict] = None, rng: Optional[np.random.RandomState] = None, initial_data: Optional[Tuple[np.ndarray, np.ndarray]] = None, activate_dims: Optional[np.ndarray] = None, incumbent_array: Optional[np.ndarray] = None, length_init: float = 0.8, length_min: float = 0.5**7, length_max: float = 1.6, success_tol: int = 3, failure_tol_min: int = 4, n_init_x_params: int = 2, n_candidate_max: int = 5000, ): self.num_valid_observations = 0 super(TuRBOSubSpace, self).__init__( config_space=config_space, bounds=bounds, hps_types=hps_types, bounds_ss_cont=bounds_ss_cont, bounds_ss_cat=bounds_ss_cat, model_local=model_local, model_local_kwargs=model_local_kwargs, acq_func_local=acq_func_local, acq_func_local_kwargs=acq_func_local_kwargs, rng=rng, initial_data=initial_data, activate_dims=activate_dims, incumbent_array=incumbent_array, ) hps = config_space.get_hyperparameters() for hp in hps: if not isinstance(hp, NumericalHyperparameter): raise ValueError("Current TurBO Optimizer only supports Numerical Hyperparameters") if len(config_space.get_conditions()) > 0 or len(config_space.get_forbiddens()) > 0: raise ValueError("Currently TurBO does not support Conditional or Forbidden Hyperparameters") n_hps = len(self.activate_dims) self.n_dims = n_hps self.n_init = n_init_x_params * self.n_dims self.n_candidates = min(100 * n_hps, n_candidate_max) self.failure_tol = max(failure_tol_min, n_hps) self.success_tol = success_tol self.length = length_init self.length_init = length_init self.length_min = length_min self.length_max = length_max self._restart_turbo(n_init_points=self.n_init) if initial_data is not None: self.add_new_observations(initial_data[0], initial_data[1]) self.init_configs = [] # type: List[Configuration] self.lb = np.zeros(self.n_dims) self.ub = np.ones(self.n_dims) self.config_origin = "TuRBO" def _restart_turbo( self, n_init_points: int, ) -> None: """ Restart TurBO with a certain number of initialized points. New points are initialized with latin hypercube Parameters ---------- n_init_points: int number of points required for initializing a new subspace """ self.logger.debug("Current length is smaller than the minimal value, a new TuRBO restarts") self.success_count = 0 self.failure_count = 0 self.num_eval_this_round = 0 self.last_incumbent_value = np.inf self.length = self.length_init self.num_valid_observations = 0 init_vectors = LatinHypercube(d=self.n_dims, seed=np.random.seed(self.rng.randint(1, 2**20))).random( n=n_init_points ) self.init_configs = [Configuration(self.cs_local, vector=init_vector) for init_vector in init_vectors]
[docs] def adjust_length(self, new_observation: Union[float, np.ndarray]) -> None: """ Adjust the subspace length according to the performance of the latest suggested values Parameters ---------- new_observation: Union[float, np.ndarray] new observations """ # see Section 2: 'Trust regions' optim_observation = new_observation if np.isscalar(new_observation) else np.min(new_observation) # We define a ``success'' as a candidate that improves upon $\xbest$, and a ``failure'' as a candidate that # does not. if optim_observation < np.min(self.model_y) - 1e-3 * math.fabs(np.min(self.model_y)): self.logger.debug("New suggested value is better than the incumbent, success_count increases") self.success_count += 1 self.failure_count = 0 else: self.logger.debug("New suggested value is worse than the incumbent, failure_count increases") self.success_count = 0 self.failure_count += 1 # After $\tau_{\text{succ}}$ consecutive successes, we double the size of the TR, # i.e., $\len \gets \min\{\len_{\textrm{max}}, 2\len\}$. if self.success_count == self.success_tol: # Expand trust region self.length = min([2.0 * self.length, self.length_max]) self.success_count = 0 self.logger.debug(f"Subspace length expands to {self.length}") # After $\tau_{\text{fail}}$ consecutive failures, we halve the size of the TR: $\len \gets \len/2$. # We reset the success and failure counters to zero after we change the size of the TR. elif self.failure_count == self.failure_tol: # Shrink trust region self.length /= 2.0 self.failure_count = 0 self.logger.debug(f"Subspace length shrinks to {self.length}")
def _generate_challengers( # type: ignore[override] self, _sorted: bool = True ) -> List[Tuple[float, Configuration]]: """ Generate new challengers list for this subspace Parameters ---------- _sorted: bool if the generated challengers are sorted by their acquisition function values """ if len(self.init_configs) > 0: config_next = self.init_configs.pop() return [(0, config_next)] if self.length < self.length_min: self._restart_turbo(n_init_points=self.n_init) config_next = self.init_configs.pop() return [(0, config_next)] self.model.train(self.model_x[-self.num_valid_observations :], self.model_y[-self.num_valid_observations :]) self.update_model(predict_x_best=False, update_incumbent_array=True) sobol_gen = Sobol(d=self.n_dims, scramble=True, seed=self.rng.randint(low=0, high=10000000)) sobol_seq = sobol_gen.random(self.n_candidates) # adjust length according to kernel length if isinstance( self.model, (GaussianProcess, MCMCGaussianProcess, GloballyAugmentedLocalGaussianProcess, GPyTorchGaussianProcess), ): if isinstance(self.model, GaussianProcess): kernel_length = np.exp(self.model.hypers[1:-1]) elif isinstance(self.model, MCMCGaussianProcess): kernel_length = np.exp(np.mean((np.array(self.model.hypers)[:, 1:-1]), axis=0)) elif isinstance(self.model, (GPyTorchGaussianProcess, GloballyAugmentedLocalGaussianProcess)): kernel_length = self.model.kernel.base_kernel.lengthscale.cpu().detach().numpy() # See section 'Trust regions' of section 2 # $\len_i = \lambda_i L / (\prod_{j=1}^d \lambda_j)^{1/d}$, # We now have weights.prod() = 1 # This makes the result more stable subspace_scale = kernel_length / np.prod(np.power(kernel_length, 1.0 / self.n_dims)) subspace_length = self.length * subspace_scale subspace_lb = np.clip(self.incumbent_array - subspace_length * 0.5, 0.0, 1.0) subspace_ub = np.clip(self.incumbent_array + subspace_length * 0.5, 0.0, 1.0) sobol_seq = sobol_seq * (subspace_ub - subspace_lb) + subspace_lb prob_perturb = min(20.0 / self.n_dims, 1.0) design = self._perturb_samples(prob_perturb, sobol_seq) # Only numerical hyperpameters are considered for TuRBO, we don't need to transfer the vectors to fit the # requirements of other sorts of hyperparameters configs = [] for vector in design: conf = deactivate_inactive_hyperparameters( configuration=None, configuration_space=self.cs_local, vector=vector ) configs.append(conf) if _sorted: return self._sort_configs_by_acq_value(configs) else: return [(0, configs[i]) for i in range(len(configs))] def _perturb_samples(self, prob_perturb: float, design: np.ndarray) -> np.ndarray: """ See Supplementary D, 'TuRBO details': In order to not perturb all coordinates at once, we use the value in the Sobol sequence with probability min{1,20/d} for a given candidate and dimension, and the value of the center otherwise perturb the generated design with the incumbent array accordingly Parameters ---------- prob_perturb: float probability that a design is perturbed by the incumbent value design: np.ndarray(self.n_candidates, self.n_dims) design array to be perturbed Returns ------- design_perturbed: np.ndarray(self.n_candidates, self.n_dims) perturbed design array """ # we will use masked array, thus the indices that will be replaced will be marked with True mask = self.rng.rand(self.n_candidates, self.n_dims) > prob_perturb ind = np.where(np.sum(mask, axis=1) == self.n_dims)[0] if self.n_dims == 1: mask[ind, 0] = 0 else: # ensure that no candidate will be completely replaced by the incumbent value mask[ind, self.rng.randint(0, self.n_dims, size=len(ind))] = 0 return np.ma.array(design, mask=mask, fill_value=self.incumbent_array).filled()
[docs] def add_new_observations(self, X: np.ndarray, y: np.ndarray) -> None: """ Add new observations to the subspace, meanwhile, we add the number of valid observation to ensure that the subspace could be scaled properly. Parameters ---------- X: np.ndarray(N,D), new feature vector of the observations, constructed by the global configuration space y: np.ndarray(N) new performances of the observations Return ---------- indices_in_ss:np.ndarray(N) indices of data that included in subspaces """ super(TuRBOSubSpace, self).add_new_observations(X, y) self.num_valid_observations += len(y)
def _sort_configs_by_acq_value(self, configs: List[Configuration]) -> List[Tuple[float, Configuration]]: """Sort the given configurations by acquisition value comes from smac.optimizer.ei_optimization.AcquisitionFunctionMaximizer Parameters ---------- configs : list(Configuration) Returns ------- list: (acquisition value, Candidate solutions), ordered by their acquisition function value """ acq_values = self.acquisition_function(configs) # From here # http://stackoverflow.com/questions/20197990/how-to-make-argsort-result-to-be-random-between-equal-values random = self.rng.rand(len(acq_values)) # Last column is primary sort key! indices = np.lexsort((random.flatten(), acq_values.flatten())) # Cannot use zip here because the indices array cannot index the # rand_configs list, because the second is a pure python list return [(acq_values[ind][0], configs[ind]) for ind in indices[::-1]]