from __future__ import annotations
from typing import Any
import itertools
import time
import numpy as np
from ConfigSpace import Configuration, ConfigurationSpace
from ConfigSpace.exceptions import ForbiddenValueError
from smac.acquisition.function import AbstractAcquisitionFunction
from smac.acquisition.maximizer.abstract_acqusition_maximizer import (
AbstractAcquisitionMaximizer,
)
from smac.utils.configspace import (
convert_configurations_to_array,
get_one_exchange_neighbourhood,
)
from smac.utils.logging import get_logger
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]
class LocalSearch(AbstractAcquisitionMaximizer):
"""Implementation of SMAC's local search.
Parameters
----------
configspace : ConfigurationSpace
acquisition_function : AbstractAcquisitionFunction
challengers : int, defaults to 5000
Number of challengers.
max_steps: int | None, defaults to None
Maximum number of iterations that the local search will perform.
n_steps_plateau_walk: int, defaults to 10
Number of steps during a plateau walk before local search terminates.
vectorization_min_obtain : int, defaults to 2
Minimal number of neighbors to obtain at once for each local search for vectorized calls. Can be tuned to
reduce the overhead of SMAC.
vectorization_max_obtain : int, defaults to 64
Maximal number of neighbors to obtain at once for each local search for vectorized calls. Can be tuned to
reduce the overhead of SMAC.
seed : int, defaults to 0
"""
def __init__(
self,
configspace: ConfigurationSpace,
acquisition_function: AbstractAcquisitionFunction | None = None,
challengers: int = 5000,
max_steps: int | None = None,
n_steps_plateau_walk: int = 10,
vectorization_min_obtain: int = 2,
vectorization_max_obtain: int = 64,
seed: int = 0,
) -> None:
super().__init__(
configspace,
acquisition_function,
challengers=challengers,
seed=seed,
)
self._max_steps = max_steps
self._n_steps_plateau_walk = n_steps_plateau_walk
self._vectorization_min_obtain = vectorization_min_obtain
self._vectorization_max_obtain = vectorization_max_obtain
@property
def meta(self) -> dict[str, Any]: # noqa: D102
meta = super().meta
meta.update(
{
"max_steps": self._max_steps,
"n_steps_plateau_walk": self._n_steps_plateau_walk,
"vectorization_min_obtain": self._vectorization_min_obtain,
"vectorization_max_obtain": self._vectorization_max_obtain,
}
)
return meta
def _maximize(
self,
previous_configs: list[Configuration],
n_points: int,
additional_start_points: list[tuple[float, Configuration]] | None = None,
) -> list[tuple[float, Configuration]]:
"""Start a local search from the given start points. Iteratively collect neighbours
using Configspace.utils.get_one_exchange_neighbourhood and evaluate them.
If the new config is better than the current best, the local search is continued from the
new config.
Quit if either the max number of steps is reached or
no neighbor with a higher improvement was found or the number of local steps self._n_steps_plateau_walk
for each of the starting point is depleted.
Parameters
----------
previous_configs : list[Configuration]
Previous configuration (e.g., from the runhistory).
n_points : int
Number of initial points to be generated.
additional_start_points : list[tuple[float, Configuration]] | None
Additional starting points.
Returns
-------
list[Configuration]
Final candidates.
"""
init_points = self._get_initial_points(previous_configs, n_points, additional_start_points)
configs_acq = self._search(init_points)
# Shuffle for random tie-break
self._rng.shuffle(configs_acq)
# Sort according to acq value
configs_acq.sort(reverse=True, key=lambda x: x[0])
for a, inc in configs_acq:
inc.origin = "Acquisition Function Maximizer: Local Search"
return configs_acq
def _get_initial_points(
self,
previous_configs: list[Configuration],
n_points: int,
additional_start_points: list[tuple[float, Configuration]] | None,
) -> list[Configuration]:
"""Get initial points to start search from.
Parameters
----------
previous_configs : list[Configuration]
Previous configuration (e.g., from the runhistory).
n_points : int
Number of initial points to be generated.
additional_start_points : list[tuple[float, Configuration]] | None
Additional starting points.
Returns
-------
list[Configuration]
A list of initial points/configurations.
"""
sampled_points = []
init_points = []
n_init_points = n_points
if len(previous_configs) < n_points:
sampled_points = self._configspace.sample_configuration(size=n_points - len(previous_configs))
n_init_points = len(previous_configs)
if not isinstance(sampled_points, list):
sampled_points = [sampled_points]
if len(previous_configs) > 0:
init_points = self._get_init_points_from_previous_configs(
previous_configs,
n_init_points,
additional_start_points,
)
return sampled_points + init_points
def _get_init_points_from_previous_configs(
self,
previous_configs: list[Configuration],
n_points: int,
additional_start_points: list[tuple[float, Configuration]] | None,
) -> list[Configuration]:
"""
Generate a set of initial points from the previous configurations and possibly additional points.
The idea is to decouple runhistory from the local search model and replace it with a more general
form (list[Configuration]). This is useful to more quickly collect new configurations
along the iterations, rather than feeding it to the runhistory every time.
create three lists and concatenate them:
1. sorted the previous configs by acquisition value
2. sorted the previous configs by marginal predictive costs
3. additional start points
and create a list that carries unique configurations only. Crucially,
when reading from left to right, all but the first occurrence of a configuration
are dropped.
Parameters
----------
previous_configs: list[Configuration]
Previous configuration (e.g., from the runhistory).
n_points: int
Number of initial points to be generated; selected from previous configs (+ random configs if necessary).
additional_start_points: list[tuple[float, Configuration]] | None
Additional starting points.
Returns
-------
init_points: list[Configuration]
A list of initial points.
"""
assert self._acquisition_function is not None
# configurations with the lowest predictive cost, check for None to make unit tests work
if self._acquisition_function.model is not None:
conf_array = convert_configurations_to_array(previous_configs)
costs = self._acquisition_function.model.predict_marginalized(conf_array)[0]
assert len(conf_array) == len(costs), (conf_array.shape, costs.shape)
# In case of the predictive model returning the prediction for more than one objective per configuration
# (for example multi-objective or EIPS) it is not immediately clear how to sort according to the cost
# of a configuration. Therefore, we simply follow the ParEGO approach and use a random scalarization.
if len(costs.shape) == 2 and costs.shape[1] > 1:
weights = np.array([self._rng.rand() for _ in range(costs.shape[1])])
weights = weights / np.sum(weights)
costs = costs @ weights
# From here: make argsort result to be random between equal values
# http://stackoverflow.com/questions/20197990/how-to-make-argsort-result-to-be-random-between-equal-values
random = self._rng.rand(len(costs))
indices = np.lexsort((random.flatten(), costs.flatten())) # Last column is primary sort key!
# Cannot use zip here because the indices array cannot index the
# rand_configs list, because the second is a pure python list
previous_configs_sorted_by_cost = [previous_configs[ind] for ind in indices][:n_points]
else:
previous_configs_sorted_by_cost = []
if additional_start_points is not None:
additional_start_points = [asp[1] for asp in additional_start_points]
else:
additional_start_points = []
init_points = []
init_points_as_set: set[Configuration] = set()
for cand in itertools.chain(
previous_configs_sorted_by_cost,
additional_start_points,
):
if cand not in init_points_as_set:
init_points.append(cand)
init_points_as_set.add(cand)
return init_points
def _search(
self,
start_points: list[Configuration],
) -> list[tuple[float, Configuration]]:
"""Optimize the acquisition function.
Execution:
1. Neighbour generation strategy for each of the starting points is according to
ConfigSpace.utils.get_one_exchange_neighbourhood.
2. Each of the starting points create a local search, that can be active.
if it is active, request a neighbour of its neightbourhood and evaluate it.
3. Comparing the acquisition function of the neighbors with the acquisition value of the
candidate.
If it improved, then the candidate is replaced by the neighbor. And this candidate is
investigated again with two new neighbours.
If it did not improve, it is investigated with twice as many new neighbours
(at most self._vectorization_max_obtain neighbours).
The local search for a starting point is stopped if the number of evaluations is larger
than self._n_steps_plateau_walk.
Parameters
----------
start_points : list[Configuration]
Starting points for the search.
Returns
-------
list[tuple[float, Configuration]]
Candidates with their acquisition function value. (acq value, candidate)
"""
assert self._acquisition_function is not None
# Gather data structure for starting points
if isinstance(start_points, Configuration):
start_points = [start_points]
candidates = start_points
# Compute the acquisition value of the candidates
num_candidates = len(candidates)
acq_val_candidates_ = self._acquisition_function(candidates)
if num_candidates == 1:
acq_val_candidates = [acq_val_candidates_[0][0]]
else:
acq_val_candidates = [a[0] for a in acq_val_candidates_]
# Set up additional variables required to do vectorized local search:
# whether the i-th local search is still running
active = [True] * num_candidates
# number of plateau walks of the i-th local search. Reaching the maximum number is the stopping criterion of
# the local search.
n_no_plateau_walk = [0] * num_candidates
# tracking the number of steps for logging purposes
local_search_steps = [0] * num_candidates
# tracking the number of neighbors looked at for logging purposes
neighbors_looked_at = [0] * num_candidates
# tracking the number of neighbors generated for logging purposse
neighbors_generated = [0] * num_candidates
# how many neighbors were obtained for the i-th local search. Important to map the individual acquisition
# function values to the correct local search run
obtain_n = [self._vectorization_min_obtain] * num_candidates
# Tracking the time it takes to compute the acquisition function
times = []
# Set up the neighborhood generators
neighborhood_iterators = []
for i, inc in enumerate(candidates):
neighborhood_iterators.append(
# get_one_exchange_neighbourhood implementational details:
# https://github.com/automl/ConfigSpace/blob/05ab3da2a06c084ba920e8e4e3f62f2e87e81442/ConfigSpace/util.pyx#L95
# Return all configurations in a one-exchange neighborhood.
#
# The method is implemented as defined by:
# Frank Hutter, Holger H. Hoos and Kevin Leyton-Brown
# Sequential Model-Based Optimization for General Algorithm Configuration
# In Proceedings of the conference on Learning and Intelligent
# Optimization(LION 5)
get_one_exchange_neighbourhood(inc, seed=self._rng.randint(low=0, high=100000))
)
local_search_steps[i] += 1
# Keeping track of configurations with equal acquisition value for plateau walking
neighbors_w_equal_acq: list[list[Configuration]] = [[] for _ in range(num_candidates)]
num_iters = 0
while np.any(active):
num_iters += 1
# Whether the i-th local search improved. When a new neighborhood is generated, this is used to determine
# whether a step was made (improvement) or not (iterator exhausted)
improved = [False] * num_candidates
# Used to request a new neighborhood for the candidates of the i-th local search
new_neighborhood = [False] * num_candidates
# gather all neighbors
neighbors = []
for i, neighborhood_iterator in enumerate(neighborhood_iterators):
if active[i]:
neighbors_for_i = []
for j in range(obtain_n[i]):
try:
n = next(neighborhood_iterator)
neighbors_generated[i] += 1
neighbors_for_i.append(n)
except ValueError as e:
# `neighborhood_iterator` raises `ValueError` with some probability when it reaches
# an invalid configuration.
logger.debug(e)
new_neighborhood[i] = True
except StopIteration:
new_neighborhood[i] = True
break
obtain_n[i] = len(neighbors_for_i)
neighbors.extend(neighbors_for_i)
if len(neighbors) != 0:
start_time = time.time()
acq_val = self._acquisition_function(neighbors)
end_time = time.time()
times.append(end_time - start_time)
if np.ndim(acq_val.shape) == 0:
acq_val = np.asarray([acq_val])
# Comparing the acquisition function of the neighbors with the acquisition value of the candidate
acq_index = 0
# Iterating the all i local searches
for i in range(num_candidates):
if not active[i]:
continue
# And for each local search we know how many neighbors we obtained
for j in range(obtain_n[i]):
# The next line is only true if there was an improvement and we basically need to iterate to
# the i+1-th local search
if improved[i]:
acq_index += 1
else:
neighbors_looked_at[i] += 1
# Found a better configuration
if acq_val[acq_index] > acq_val_candidates[i]:
is_valid = False
try:
neighbors[acq_index].is_valid_configuration()
is_valid = True
except (ValueError, ForbiddenValueError) as e:
logger.debug("Local search %d: %s", i, e)
if is_valid:
# We comment this as it just spams the log
# logger.debug(
# "Local search %d: Switch to one of the neighbors (after %d configurations).",
# i,
# neighbors_looked_at[i],
# )
candidates[i] = neighbors[acq_index]
acq_val_candidates[i] = acq_val[acq_index]
new_neighborhood[i] = True
improved[i] = True
local_search_steps[i] += 1
neighbors_w_equal_acq[i] = []
obtain_n[i] = 1
# Found an equally well performing configuration, keeping it for plateau walking
elif acq_val[acq_index] == acq_val_candidates[i]:
neighbors_w_equal_acq[i].append(neighbors[acq_index])
acq_index += 1
# Now we check whether we need to create new neighborhoods and whether we need to increase the number of
# plateau walks for one of the local searches. Also disables local searches if the number of plateau walks
# is reached (and all being switched off is the termination criterion).
for i in range(num_candidates):
if not active[i]:
continue
if obtain_n[i] == 0 or improved[i]:
obtain_n[i] = 2
else:
obtain_n[i] = obtain_n[i] * 2
obtain_n[i] = min(obtain_n[i], self._vectorization_max_obtain)
if new_neighborhood[i]:
if not improved[i] and n_no_plateau_walk[i] < self._n_steps_plateau_walk:
if len(neighbors_w_equal_acq[i]) != 0:
candidates[i] = neighbors_w_equal_acq[i][0]
neighbors_w_equal_acq[i] = []
n_no_plateau_walk[i] += 1
if n_no_plateau_walk[i] >= self._n_steps_plateau_walk:
active[i] = False
continue
neighborhood_iterators[i] = get_one_exchange_neighbourhood(
candidates[i],
seed=self._rng.randint(low=0, high=100000),
)
logger.debug(
"Local searches took %s steps and looked at %s configurations. Computing the acquisition function in "
"vectorized for took %f seconds on average.",
local_search_steps,
neighbors_looked_at,
np.mean(times),
)
return [(a, i) for a, i in zip(acq_val_candidates, candidates)]