from __future__ import annotations
from typing import Any, Iterator
import math
from collections import defaultdict
import numpy as np
from ConfigSpace import Configuration
from smac.constants import MAXINT
from smac.intensifier.abstract_intensifier import AbstractIntensifier
from smac.runhistory import TrialInfo
from smac.runhistory.dataclasses import InstanceSeedBudgetKey
from smac.runhistory.errors import NotEvaluatedError
from smac.scenario import Scenario
from smac.utils.configspace import get_config_hash
from smac.utils.data_structures import batch
from smac.utils.logging import get_logger
from smac.utils.pareto_front import calculate_pareto_front, sort_by_crowding_distance
__copyright__ = "Copyright 2022, automl.org"
__license__ = "3-clause BSD"
logger = get_logger(__name__)
[docs]
class SuccessiveHalving(AbstractIntensifier):
    """
    Implementation of Succesive Halving supporting multi-fidelity, multi-objective, and multi-processing.
    Internally, a tracker keeps track of configurations and their bracket and stage.
    The behaviour of this intensifier is as follows:
    - First, adds configurations from the runhistory to the tracker. The first stage is always filled-up. For example,
      the user provided 4 configs with the tell-method but the first stage requires 8 configs: 4 new configs are
      sampled and added together with the provided configs as a group to the tracker.
    - While loop:
      - If a trial in the tracker has not been yielded yet, yield it.
      - If we are running out of trials, we simply add a new batch of configurations to the first stage.
    Note
    ----
    The implementation natively supports brackets from Hyperband. However, in the case of Successive Halving,
    only one bracket is used.
    Parameters
    ----------
    eta : int, defaults to 3
        Input that controls the proportion of configurations discarded in each round of Successive Halving.
    n_seeds : int, defaults to 1
        How many seeds to use for each instance.
    instance_seed_order : str, defaults to "shuffle_once"
        How to order the instance-seed pairs. Can be set to:
        - `None`: No shuffling at all and use the instance-seed order provided by the user.
        - `shuffle_once`: Shuffle the instance-seed keys once and use the same order across all runs.
        - `shuffle`: Shuffles the instance-seed keys for each bracket individually.
    incumbent_selection : str, defaults to "highest_observed_budget"
        How to select the incumbent when using budgets. Can be set to:
        - `any_budget`: Incumbent is the best on any budget i.e., best performance regardless of budget.
        - `highest_observed_budget`: Incumbent is the best in the highest budget run so far.
        - `highest_budget`: Incumbent is selected only based on the highest budget.
    max_incumbents : int, defaults to 10
        How many incumbents to keep track of in the case of multi-objective.
    seed : int, defaults to None
        Internal seed used for random events like shuffle seeds.
    """
    def __init__(
        self,
        scenario: Scenario,
        eta: int = 3,
        n_seeds: int = 1,
        instance_seed_order: str | None = "shuffle_once",
        max_incumbents: int = 10,
        incumbent_selection: str = "highest_observed_budget",
        seed: int | None = None,
    ):
        super().__init__(
            scenario=scenario,
            n_seeds=n_seeds,
            max_incumbents=max_incumbents,
            seed=seed,
        )
        self._eta = eta
        self._instance_seed_order = instance_seed_order
        self._incumbent_selection = incumbent_selection
        self._highest_observed_budget_only = False if incumbent_selection == "any_budget" else True
        # Global variables derived from scenario
        self._min_budget = self._scenario.min_budget
        self._max_budget = self._scenario.max_budget
    @property
    def meta(self) -> dict[str, Any]:  # noqa: D102
        meta = super().meta
        meta.update(
            {
                "eta": self._eta,
                "instance_seed_order": self._instance_seed_order,
                "incumbent_selection": self._incumbent_selection,
            }
        )
        return meta
[docs]
    def reset(self) -> None:
        """Reset the internal variables of the intensifier including the tracker."""
        super().reset()
        # States
        # dict[tuple[bracket, stage], list[tuple[seed to shuffle instance-seed keys, list[config_id]]]
        self._tracker: dict[tuple[int, int], list[tuple[int | None, list[Configuration]]]] = defaultdict(list) 
[docs]
    def __post_init__(self) -> None:
        """Post initialization steps after the runhistory has been set."""
        super().__post_init__()
        # We generate our instance seed pairs once
        is_keys = self.get_instance_seed_keys_of_interest()
        # Budgets, followed by lots of sanity-checking
        eta = self._eta
        min_budget = self._min_budget
        max_budget = self._max_budget
        if max_budget is not None and min_budget is not None and max_budget < min_budget:
            raise ValueError("Max budget has to be larger than min budget.")
        if self.uses_instances:
            if isinstance(min_budget, float) or isinstance(max_budget, float):
                raise ValueError("Successive Halving requires integer budgets when using instances.")
            min_budget = min_budget if min_budget is not None else 1
            max_budget = max_budget if max_budget is not None else len(is_keys)
            if max_budget > len(is_keys):
                raise ValueError(
                    f"Max budget of {max_budget} can not be greater than the number of instance-seed "
                    f"keys ({len(is_keys)})."
                )
            if max_budget < len(is_keys):
                logger.warning(
                    f"Max budget {max_budget} does not include all instance seed  " f"pairs ({len(is_keys)})."
                )
        else:
            if min_budget is None or max_budget is None:
                raise ValueError(
                    "Successive Halving requires the parameters min_budget and max_budget defined in the scenario."
                )
            if len(is_keys) != 1:
                raise ValueError("Successive Halving supports only one seed when using budgets.")
        if min_budget is None or min_budget <= 0:
            raise ValueError("Min budget has to be larger than 0.")
        budget_type = "INSTANCES" if self.uses_instances else "BUDGETS"
        logger.info(
            f"Successive Halving uses budget type {budget_type} with eta {eta}, "
            f"min budget {min_budget}, and max budget {max_budget}."
        )
        # Pre-computing Successive Halving variables
        max_iter = self._get_max_iterations(eta, max_budget, min_budget)
        budgets, n_configs = self._compute_configs_and_budgets_for_stages(eta, max_budget, max_iter)
        # Global variables
        self._min_budget = min_budget
        self._max_budget = max_budget
        # Stage variables, depending on the bracket (0 is the bracket here since SH only has one bracket)
        self._max_iterations: dict[int, int] = {0: max_iter + 1}
        self._n_configs_in_stage: dict[int, list] = {0: n_configs}
        self._budgets_in_stage: dict[int, list] = {0: budgets} 
    @staticmethod
    def _get_max_iterations(eta: int, max_budget: float | int, min_budget: float | int) -> int:
        return int(np.floor(np.log(max_budget / min_budget) / np.log(eta)))
    @staticmethod
    def _compute_configs_and_budgets_for_stages(
        eta: int, max_budget: float | int, max_iter: int, s_max: int | None = None
    ) -> tuple[list[int], list[int]]:
        if s_max is None:
            s_max = max_iter
        n_initial_challengers = math.ceil((eta**max_iter) * (s_max + 1) / (max_iter + 1))
        # How many configs in each stage
        lin_space = -np.linspace(0, max_iter, max_iter + 1)
        n_configs_ = np.floor(n_initial_challengers * np.power(eta, lin_space))
        n_configs = np.array(np.round(n_configs_), dtype=int).tolist()
        # How many budgets in each stage
        lin_space = -np.linspace(max_iter, 0, max_iter + 1)
        budgets = (max_budget * np.power(eta, lin_space)).tolist()
        return budgets, n_configs
[docs]
    def get_state(self) -> dict[str, Any]:  # noqa: D102
        # Replace config by dict
        tracker: dict[str, list[tuple[int | None, list[dict]]]] = defaultdict(list)
        for key in list(self._tracker.keys()):
            for seed, configs in self._tracker[key]:
                # We have to make key serializable
                new_key = f"{key[0]},{key[1]}"
                tracker[new_key].append((seed, [dict(config) for config in configs]))
        return {"tracker": tracker} 
[docs]
    def set_state(self, state: dict[str, Any]) -> None:  # noqa: D102
        self._tracker = defaultdict(list)
        tracker = state["tracker"]
        for old_key in list(tracker.keys()):
            keys = [k for k in old_key.split(",")]
            new_key = (int(keys[0]), int(keys[1]))
            for seed, config_dicts in tracker[old_key]:
                seed = None if seed is None else int(seed)
                self._tracker[new_key].append(
                    (
                        seed,
                        [Configuration(self._scenario.configspace, config_dict) for config_dict in config_dicts],
                    )
                ) 
    @property
    def uses_seeds(self) -> bool:  # noqa: D102
        return True
    @property
    def uses_budgets(self) -> bool:  # noqa: D102
        if self._scenario.instances is None:
            return True
        return False
    @property
    def uses_instances(self) -> bool:  # noqa: D102
        if self._scenario.instances is None:
            return False
        return True
[docs]
    def print_tracker(self) -> None:
        """Prints the number of configurations in each bracket/stage."""
        messages = []
        for (bracket, stage), others in self._tracker.items():
            counter = 0
            for _, config_ids in others:
                counter += len(config_ids)
            if counter > 0:
                messages.append(f"--- Bracket {bracket} / Stage {stage}: {counter} configs")
        if len(messages) > 0:
            logger.debug(f"{self.__class__.__name__} statistics:")
        for message in messages:
            logger.debug(message) 
[docs]
    def get_trials_of_interest(
        self,
        config: Configuration,
        *,
        validate: bool = False,
        seed: int | None = None,
    ) -> list[TrialInfo]:  # noqa: D102
        is_keys = self.get_instance_seed_keys_of_interest(validate=validate, seed=seed)
        budget = None
        # When we use budgets, we always evaluated on the highest budget only
        if self.uses_budgets:
            budget = self._max_budget
        trials = []
        for key in is_keys:
            trials.append(TrialInfo(config=config, instance=key.instance, seed=key.seed, budget=budget))
        return trials 
[docs]
    def get_instance_seed_budget_keys(
        self, config: Configuration, compare: bool = False
    ) -> list[InstanceSeedBudgetKey]:
        """Returns the instance-seed-budget keys for a given configuration. This method supports ``highest_budget``,
        which only returns the instance-seed-budget keys for the highest budget (if specified). In this case, the
        incumbents in ``update_incumbents`` are only changed if the costs on the highest budget are lower.
        Parameters
        ----------
        config: Configuration
            The Configuration to be queried
        compare : bool, defaults to False
            Get rid of the budget information for comparing if the configuration was evaluated on the same
            instance-seed keys.
        """
        isb_keys = self.runhistory.get_instance_seed_budget_keys(
            config, highest_observed_budget_only=self._highest_observed_budget_only
        )
        # If incumbent should only be changed on the highest budget, we have to kick out all budgets below the highest
        if self.uses_budgets and self._incumbent_selection == "highest_budget":
            isb_keys = [key for key in isb_keys if key.budget == self._max_budget]
        if compare:
            # Get rid of duplicates
            isb_keys = list(
                set([InstanceSeedBudgetKey(instance=key.instance, seed=key.seed, budget=None) for key in isb_keys])
            )
        return isb_keys 
    def __iter__(self) -> Iterator[TrialInfo]:  # noqa: D102
        self.__post_init__()
        # Log brackets/stages
        logger.info("Number of configs in stage:")
        for bracket, n in self._n_configs_in_stage.items():
            logger.info(f"--- Bracket {bracket}: {n}")
        logger.info("Budgets in stage:")
        for bracket, budgets in self._budgets_in_stage.items():
            logger.info(f"--- Bracket {bracket}: {budgets}")
        rh = self.runhistory
        # We have to add already existing trials from the runhistory
        # Idea: We simply add existing configs to the tracker (first stage) but assign a random instance shuffle seed.
        # In the best case, trials (added from the users) are included in the seed and it has not re-computed again.
        # Note: If the intensifier was restored, we don't want to go in here
        if len(self._tracker) == 0:
            bracket = 0
            stage = 0
            # Print ignored budgets
            ignored_budgets = []
            for k in rh.keys():
                if k.budget not in self._budgets_in_stage[0] and k.budget not in ignored_budgets:
                    ignored_budgets.append(k.budget)
            if len(ignored_budgets) > 0:
                logger.warning(
                    f"Trials with budgets {ignored_budgets} will been ignored. Consider adding trials with budgets "
                    f"{self._budgets_in_stage[0]}."
                )
            # We batch the configs because we need n_configs in each iteration
            # If we don't have n_configs, we sample new ones
            # We take the number of configs from the first bracket and the first stage
            n_configs = self._n_configs_in_stage[bracket][stage]
            for configs in batch(rh.get_configs(), n_configs):
                n_rh_configs = len(configs)
                if len(configs) < n_configs:
                    try:
                        config = next(self.config_generator)
                        configs.append(config)
                    except StopIteration:
                        # We stop if we don't find any configuration anymore
                        return
                seed = self._get_next_order_seed()
                self._tracker[(bracket, stage)].append((seed, configs))
                logger.info(
                    f"Added {n_rh_configs} configs from runhistory and {n_configs - n_rh_configs} new configs to "
                    f"Successive Halving's first bracket and first stage with order seed {seed}."
                )
        while True:
            # If we don't yield trials anymore, we have to update
            # Otherwise, we can just keep yielding trials from the tracker
            update = False
            # We iterate over the tracker to do two things:
            # 1) Yield trials of configs that are not yet evaluated/running
            # 2) Update tracker and move better configs to the next stage
            # We start in reverse order to complete higher stages first
            logger.debug("Updating tracker:")
            # TODO: Process stages ascending or descending?
            for bracket, stage in list(self._tracker.keys()):
                pairs = self._tracker[(bracket, stage)].copy()
                for seed, configs in pairs:
                    isb_keys = self._get_instance_seed_budget_keys_by_stage(bracket=bracket, stage=stage, seed=seed)
                    # We iterate over the configs and yield trials which are not running/evaluated yet
                    for config in configs:
                        config_hash = get_config_hash(config)
                        trials = self._get_next_trials(config, from_keys=isb_keys)
                        logger.debug(
                            f"--- Yielding {len(trials)}/{len(isb_keys)} for config {config_hash} in "
                            f"stage {stage} with seed {seed}..."
                        )
                        for trial in trials:
                            yield trial
                            update = True
                    # If all configs were evaluated on ``n_configs_required``, we finally can compare
                    try:
                        successful_configs = self._get_best_configs(configs, bracket, stage, isb_keys)
                    except NotEvaluatedError:
                        # We can't compare anything, so we just continue with the next pairs
                        logger.debug("--- Could not compare configs because not all trials have been evaluated yet.")
                        continue
                    # Update tracker
                    # Remove current shuffle index / config pair
                    self._tracker[(bracket, stage)].remove((seed, configs))
                    # Add successful to the next stage
                    if stage < self._max_iterations[bracket] - 1:
                        config_ids = [rh.get_config_id(config) for config in successful_configs]
                        self._tracker[(bracket, stage + 1)].append((seed, successful_configs))
                        logger.debug(
                            f"--- Promoted {len(config_ids)} configs from stage {stage} to stage {stage + 1} in "
                            f"bracket {bracket}."
                        )
                    else:
                        logger.debug(
                            f"--- Removed {len(successful_configs)} configs to last stage in bracket {bracket}."
                        )
                    # Log how many configs are in each stage
                    self.print_tracker()
            # Since we yielded something before, we want to go back as long as we do not find any trials anymore
            if update:
                continue
            # TODO: Aggressive progressing without knowing how well trials performed
            # Idea: Don't add constantly new batches (see ASHA)
            # If we are running out of trials, we want to add configs to the first stage
            # We simply add as many configs to the stage as required (_n_configs_in_stage[0])
            configs = []
            next_bracket = self._get_next_bracket()
            for _ in range(self._n_configs_in_stage[next_bracket][0]):
                try:
                    config = next(self.config_generator)
                    configs.append(config)
                except StopIteration:
                    # We stop if we don't find any configuration anymore
                    logger.warning(
                        "If you assume your configspace was not yet exhausted, try to "
                        "increase the number of retries in the config selector."
                    )
                    return
            # We keep track of the seed so we always evaluate on the same instances
            next_seed = self._get_next_order_seed()
            self._tracker[(next_bracket, 0)].append((next_seed, configs))
            logger.debug(
                f"Added {len(configs)} new configs to bracket {next_bracket} stage 0 with shuffle seed {next_seed}."
            )
    def _get_instance_seed_budget_keys_by_stage(
        self,
        bracket: int,
        stage: int,
        seed: int | None = None,
    ) -> list[InstanceSeedBudgetKey]:
        """Returns all instance-seed-budget keys (isb keys) for the given stage. Each stage
        is associated with a budget (N). Two possible options:
        1) Instance based: We return N isb keys. If a seed is specified, we shuffle the keys before
        returning the first N instances. The budget is set to None here.
        2) Budget based: We return one isb only but the budget is set to N.
        """
        budget: float | int | None = None
        is_keys = self.get_instance_seed_keys_of_interest()
        # We have to differentiate between budgets and instances based here
        # If we are budget based, we always have one instance seed pair only
        # If we are in the instance setting, we have to return a specific number of instance seed pairs
        if self.uses_instances:
            # Shuffle instance seed pairs group-based
            if seed is not None:
                is_keys = self._reorder_instance_seed_keys(is_keys, seed=seed)
            # We only return the first N instances
            N = int(self._budgets_in_stage[bracket][stage])
            is_keys = is_keys[:N]
        else:
            assert len(is_keys) == 1
            # The stage defines which budget should be used (in real-valued setting)
            # No shuffle is needed here because we only have on instance seed pair
            budget = self._budgets_in_stage[bracket][stage]
        isbk = []
        for isk in is_keys:
            isbk.append(InstanceSeedBudgetKey(instance=isk.instance, seed=isk.seed, budget=budget))
        return isbk
    def _get_next_trials(
        self,
        config: Configuration,
        from_keys: list[InstanceSeedBudgetKey],
    ) -> list[TrialInfo]:
        """Returns trials for a given config from a list of instances (instance-seed-budget keys). The returned trials
        have not run or evaluated yet.
        """
        rh = self.runhistory
        evaluated_trials = rh.get_trials(config, highest_observed_budget_only=False)
        running_trials = rh.get_running_trials(config)
        next_trials: list[TrialInfo] = []
        for instance in from_keys:
            trial = TrialInfo(config=config, instance=instance.instance, seed=instance.seed, budget=instance.budget)
            if trial in evaluated_trials or trial in running_trials:
                continue
            next_trials.append(trial)
        return next_trials
    def _get_best_configs(
        self,
        configs: list[Configuration],
        bracket: int,
        stage: int,
        from_keys: list[InstanceSeedBudgetKey],
    ) -> list[Configuration]:
        """Returns the best configurations. The number of configurations is depending on the stage. Raises
        ``NotEvaluatedError`` if not all trials have been evaluated.
        """
        try:
            n_configs = self._n_configs_in_stage[bracket][stage + 1]
        except IndexError:
            return []
        rh = self.runhistory
        configs = configs.copy()
        for config in configs:
            isb_keys = rh.get_instance_seed_budget_keys(config)
            if not all(isb_key in isb_keys for isb_key in from_keys):
                raise NotEvaluatedError
        selected_configs: list[Configuration] = []
        while len(selected_configs) < n_configs:
            # We calculate the pareto front for the given configs
            # We use the same isb keys for all the configs
            all_keys = [from_keys for _ in configs]
            incumbents = calculate_pareto_front(rh, configs, all_keys)
            # Idea: We recursively calculate the pareto front in every iteration
            for incumbent in incumbents:
                configs.remove(incumbent)
                selected_configs.append(incumbent)
        # If we have more selected configs, we remove the ones with the smallest crowding distance
        if len(selected_configs) > n_configs:
            all_keys = [from_keys for _ in selected_configs]
            selected_configs = sort_by_crowding_distance(rh, selected_configs, all_keys)[:n_configs]
            logger.debug("Found more configs than required. Removed configs with smallest crowding distance.")
        return selected_configs
    def _get_next_order_seed(self) -> int | None:
        """Next instances shuffle seed to use."""
        # Here we have the option to shuffle the trials when specified by the user
        if self._instance_seed_order == "shuffle":
            seed = self._rng.randint(0, MAXINT)
        elif self._instance_seed_order == "shuffle_once":
            seed = 0
        else:
            seed = None
        return seed
    def _get_next_bracket(self) -> int:
        """Successive Halving only uses one bracket. Therefore, we always return 0 here."""
        return 0