from __future__ import annotations
from typing import Any
import itertools
import time
import numpy as np
from ConfigSpace import Configuration, ConfigurationSpace
from ConfigSpace.exceptions import ForbiddenValueError
from smac.acquisition.function import AbstractAcquisitionFunction
from smac.acquisition.maximizer.abstract_acqusition_maximizer import (
AbstractAcquisitionMaximizer,
)
from smac.utils.configspace import (
convert_configurations_to_array,
get_one_exchange_neighbourhood,
)
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]class LocalSearch(AbstractAcquisitionMaximizer):
"""Implementation of SMAC's local search.
Parameters
----------
configspace : ConfigurationSpace
acquisition_function : AbstractAcquisitionFunction
challengers : int, defaults to 5000
Number of challengers.
max_steps: int | None, defaults to None
Maximum number of iterations that the local search will perform.
n_steps_plateau_walk: int, defaults to 10
Number of steps during a plateau walk before local search terminates.
vectorization_min_obtain : int, defaults to 2
Minimal number of neighbors to obtain at once for each local search for vectorized calls. Can be tuned to
reduce the overhead of SMAC.
vectorization_max_obtain : int, defaults to 64
Maximal number of neighbors to obtain at once for each local search for vectorized calls. Can be tuned to
reduce the overhead of SMAC.
seed : int, defaults to 0
"""
def __init__(
self,
configspace: ConfigurationSpace,
acquisition_function: AbstractAcquisitionFunction | None = None,
challengers: int = 5000,
max_steps: int | None = None,
n_steps_plateau_walk: int = 10,
vectorization_min_obtain: int = 2,
vectorization_max_obtain: int = 64,
seed: int = 0,
) -> None:
super().__init__(
configspace,
acquisition_function,
challengers=challengers,
seed=seed,
)
self._max_steps = max_steps
self._n_steps_plateau_walk = n_steps_plateau_walk
self._vectorization_min_obtain = vectorization_min_obtain
self._vectorization_max_obtain = vectorization_max_obtain
@property
def meta(self) -> dict[str, Any]: # noqa: D102
meta = super().meta
meta.update(
{
"max_steps": self._max_steps,
"n_steps_plateau_walk": self._n_steps_plateau_walk,
"vectorization_min_obtain": self._vectorization_min_obtain,
"vectorization_max_obtain": self._vectorization_max_obtain,
}
)
return meta
def _maximize(
self,
previous_configs: list[Configuration],
n_points: int,
additional_start_points: list[tuple[float, Configuration]] | None = None,
) -> list[tuple[float, Configuration]]:
"""Starts a local search from the given startpoint and quits if either the max number of
steps is reached or no neighbor with an higher improvement was found.
"""
init_points = self._get_initial_points(previous_configs, n_points, additional_start_points)
configs_acq = self._search(init_points)
# Shuffle for random tie-break
self._rng.shuffle(configs_acq)
# Sort according to acq value
configs_acq.sort(reverse=True, key=lambda x: x[0])
for a, inc in configs_acq:
inc.origin = "Local Search"
return configs_acq
def _get_initial_points(
self,
previous_configs: list[Configuration],
n_points: int,
additional_start_points: list[tuple[float, Configuration]] | None,
) -> list[Configuration]:
if len(previous_configs) == 0:
init_points = self._configspace.sample_configuration(size=n_points)
else:
init_points = self._get_init_points_from_previous_configs(
previous_configs,
n_points,
additional_start_points,
)
return init_points
def _get_init_points_from_previous_configs(
self,
previous_configs: list[Configuration],
n_points: int,
additional_start_points: list[tuple[float, Configuration]] | None,
) -> list[Configuration]:
"""
A function that generates a set of initial points from the previous configurations and additional points (if
applicable). The idea is to decouple runhistory from the local search model and replace it with a more genreal
form (list[Configuration]).
Parameters
----------
previous_configs: list[Configuration]
Previous configuration (e.g., from the runhistory).
n_points: int
Number of initial points to be generated.
additional_start_points: list[tuple[float, Configuration]] | None
if we want to specify another set of points as initial points
Returns
-------
init_points: list[Configuration]
A list of initial points.
"""
assert self._acquisition_function is not None
# configurations with the highest previous EI
configs_previous_runs_sorted = self._sort_by_acquisition_value(previous_configs)
configs_previous_runs_sorted = [conf[1] for conf in configs_previous_runs_sorted[:n_points]]
# configurations with the lowest predictive cost, check for None to make unit tests work
if self._acquisition_function.model is not None:
conf_array = convert_configurations_to_array(previous_configs)
costs = self._acquisition_function.model.predict_marginalized(conf_array)[0]
assert len(conf_array) == len(costs), (conf_array.shape, costs.shape)
# In case of the predictive model returning the prediction for more than one objective per configuration
# (for example multi-objective or EIPS) it is not immediately clear how to sort according to the cost
# of a configuration. Therefore, we simply follow the ParEGO approach and use a random scalarization.
if len(costs.shape) == 2 and costs.shape[1] > 1:
weights = np.array([self._rng.rand() for _ in range(costs.shape[1])])
weights = weights / np.sum(weights)
costs = costs @ weights
# From here
# http://stackoverflow.com/questions/20197990/how-to-make-argsort-result-to-be-random-between-equal-values
random = self._rng.rand(len(costs))
# Last column is primary sort key!
indices = np.lexsort((random.flatten(), costs.flatten()))
# Cannot use zip here because the indices array cannot index the
# rand_configs list, because the second is a pure python list
previous_configs_sorted_by_cost = [previous_configs[ind] for ind in indices][:n_points]
else:
previous_configs_sorted_by_cost = []
if additional_start_points is not None:
additional_start_points = [asp[1] for asp in additional_start_points[:n_points]]
else:
additional_start_points = []
init_points = []
init_points_as_set: set[Configuration] = set()
for cand in itertools.chain(
configs_previous_runs_sorted,
previous_configs_sorted_by_cost,
additional_start_points,
):
if cand not in init_points_as_set:
init_points.append(cand)
init_points_as_set.add(cand)
return init_points
def _search(
self,
start_points: list[Configuration],
) -> list[tuple[float, Configuration]]:
assert self._acquisition_function is not None
# Gather data structure for starting points
if isinstance(start_points, Configuration):
start_points = [start_points]
candidates = start_points
# Compute the acquisition value of the candidates
num_candidates = len(candidates)
acq_val_candidates_ = self._acquisition_function(candidates)
if num_candidates == 1:
acq_val_candidates = [acq_val_candidates_[0][0]]
else:
acq_val_candidates = [a[0] for a in acq_val_candidates_]
# Set up additional variables required to do vectorized local search:
# whether the i-th local search is still running
active = [True] * num_candidates
# number of plateau walks of the i-th local search. Reaching the maximum number is the stopping criterion of
# the local search.
n_no_plateau_walk = [0] * num_candidates
# tracking the number of steps for logging purposes
local_search_steps = [0] * num_candidates
# tracking the number of neighbors looked at for logging purposes
neighbors_looked_at = [0] * num_candidates
# tracking the number of neighbors generated for logging purposse
neighbors_generated = [0] * num_candidates
# how many neighbors were obtained for the i-th local search. Important to map the individual acquisition
# function values to the correct local search run
obtain_n = [self._vectorization_min_obtain] * num_candidates
# Tracking the time it takes to compute the acquisition function
times = []
# Set up the neighborhood generators
neighborhood_iterators = []
for i, inc in enumerate(candidates):
neighborhood_iterators.append(
get_one_exchange_neighbourhood(inc, seed=self._rng.randint(low=0, high=100000))
)
local_search_steps[i] += 1
# Keeping track of configurations with equal acquisition value for plateau walking
neighbors_w_equal_acq: list[list[Configuration]] = [[] for _ in range(num_candidates)]
num_iters = 0
while np.any(active):
num_iters += 1
# Whether the i-th local search improved. When a new neighborhood is generated, this is used to determine
# whether a step was made (improvement) or not (iterator exhausted)
improved = [False] * num_candidates
# Used to request a new neighborhood for the candidates of the i-th local search
new_neighborhood = [False] * num_candidates
# gather all neighbors
neighbors = []
for i, neighborhood_iterator in enumerate(neighborhood_iterators):
if active[i]:
neighbors_for_i = []
for j in range(obtain_n[i]):
try:
n = next(neighborhood_iterator)
neighbors_generated[i] += 1
neighbors_for_i.append(n)
except ValueError as e:
# `neighborhood_iterator` raises `ValueError` with some probability when it reaches
# an invalid configuration.
logger.debug(e)
new_neighborhood[i] = True
except StopIteration:
new_neighborhood[i] = True
break
obtain_n[i] = len(neighbors_for_i)
neighbors.extend(neighbors_for_i)
if len(neighbors) != 0:
start_time = time.time()
acq_val = self._acquisition_function(neighbors)
end_time = time.time()
times.append(end_time - start_time)
if np.ndim(acq_val.shape) == 0:
acq_val = np.asarray([acq_val])
# Comparing the acquisition function of the neighbors with the acquisition value of the candidate
acq_index = 0
# Iterating the all i local searches
for i in range(num_candidates):
if not active[i]:
continue
# And for each local search we know how many neighbors we obtained
for j in range(obtain_n[i]):
# The next line is only true if there was an improvement and we basically need to iterate to
# the i+1-th local search
if improved[i]:
acq_index += 1
else:
neighbors_looked_at[i] += 1
# Found a better configuration
if acq_val[acq_index] > acq_val_candidates[i]:
is_valid = False
try:
neighbors[acq_index].is_valid_configuration()
is_valid = True
except (ValueError, ForbiddenValueError) as e:
logger.debug("Local search %d: %s", i, e)
if is_valid:
# We comment this as it just spams the log
# logger.debug(
# "Local search %d: Switch to one of the neighbors (after %d configurations).",
# i,
# neighbors_looked_at[i],
# )
candidates[i] = neighbors[acq_index]
acq_val_candidates[i] = acq_val[acq_index]
new_neighborhood[i] = True
improved[i] = True
local_search_steps[i] += 1
neighbors_w_equal_acq[i] = []
obtain_n[i] = 1
# Found an equally well performing configuration, keeping it for plateau walking
elif acq_val[acq_index] == acq_val_candidates[i]:
neighbors_w_equal_acq[i].append(neighbors[acq_index])
acq_index += 1
# Now we check whether we need to create new neighborhoods and whether we need to increase the number of
# plateau walks for one of the local searches. Also disables local searches if the number of plateau walks
# is reached (and all being switched off is the termination criterion).
for i in range(num_candidates):
if not active[i]:
continue
if obtain_n[i] == 0 or improved[i]:
obtain_n[i] = 2
else:
obtain_n[i] = obtain_n[i] * 2
obtain_n[i] = min(obtain_n[i], self._vectorization_max_obtain)
if new_neighborhood[i]:
if not improved[i] and n_no_plateau_walk[i] < self._n_steps_plateau_walk:
if len(neighbors_w_equal_acq[i]) != 0:
candidates[i] = neighbors_w_equal_acq[i][0]
neighbors_w_equal_acq[i] = []
n_no_plateau_walk[i] += 1
if n_no_plateau_walk[i] >= self._n_steps_plateau_walk:
active[i] = False
continue
neighborhood_iterators[i] = get_one_exchange_neighbourhood(
candidates[i],
seed=self._rng.randint(low=0, high=100000),
)
logger.debug(
"Local searches took %s steps and looked at %s configurations. Computing the acquisition function in "
"vectorized for took %f seconds on average.",
local_search_steps,
neighbors_looked_at,
np.mean(times),
)
return [(a, i) for a, i in zip(acq_val_candidates, candidates)]