Source code for smac.facade.experimental.hydra_facade

# type: ignore
# mypy: ignore-errors

import typing

import copy
import datetime
import logging
import os
import pickle
import time
from collections import defaultdict

import numpy as np
from ConfigSpace.configuration_space import Configuration

from smac.epm.utils import get_rng
from smac.facade.psmac_facade import PSMAC
from smac.facade.smac_ac_facade import SMAC4AC
from smac.optimizer.pSMAC import read
from smac.runhistory.runhistory import RunHistory
from smac.scenario.scenario import Scenario
from smac.tae.base import BaseRunner
from smac.tae.execute_ta_run_hydra import ExecuteTARunHydra, ExecuteTARunOld
from smac.utils.constants import MAXINT
from smac.utils.io.output_directory import create_output_directory

__author__ = "Marius Lindauer"
__copyright__ = "Copyright 2017, ML4AAD"
__license__ = "3-clause BSD"


[docs]class Hydra(object): """ Facade to use Hydra default mode Parameters ---------- scenario : ~smac.scenario.scenario.Scenario Scenario object n_iterations: int, number of Hydra iterations val_set: str Set to validate incumbent(s) on. [train, valX]. train => whole training set, valX => train_set * 100/X where X in (0, 100) incs_per_round: int Number of incumbents to keep per round n_optimizers: int Number of optimizers to run in parallel per round rng: int/np.random.RandomState The randomState/seed to pass to each smac run run_id: int run_id for this hydra run tae: BaseRunner Target Algorithm Runner (supports old and aclib format as well as AbstractTAFunc) tae_kwargs: Optional[dict] arguments passed to constructor of '~tae' Attributes ---------- logger stats : Stats loggs information about used resources solver : SMBO handles the actual algorithm calls rh : RunHistory List with information about previous runs portfolio : list List of all incumbents """ def __init__( self, scenario: typing.Type[Scenario], n_iterations: int, val_set: str = "train", incs_per_round: int = 1, n_optimizers: int = 1, rng: typing.Optional[typing.Union[np.random.RandomState, int]] = None, run_id: int = 1, tae: typing.Type[BaseRunner] = ExecuteTARunOld, tae_kwargs: typing.Union[dict, None] = None, **kwargs, ): self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.n_iterations = n_iterations self.scenario = scenario self.run_id, self.rng = get_rng(rng, run_id, self.logger) self.kwargs = kwargs self.output_dir = None self.top_dir = None self.solver = None self.portfolio = None self.rh = RunHistory() self._tae = tae self._tae_kwargs = tae_kwargs if incs_per_round <= 0: self.logger.warning("Invalid value in %s: %d. Setting to 1", "incs_per_round", incs_per_round) self.incs_per_round = max(incs_per_round, 1) if n_optimizers <= 0: self.logger.warning("Invalid value in %s: %d. Setting to 1", "n_optimizers", n_optimizers) self.n_optimizers = max(n_optimizers, 1) self.val_set = self._get_validation_set(val_set) self.cost_per_inst = {} self.optimizer = None self.portfolio_cost = None def _get_validation_set(self, val_set: str, delete: bool = True) -> typing.List[str]: """ Create small validation set for hydra to determine incumbent performance Parameters ---------- val_set: str Set to validate incumbent(s) on. [train, valX]. train => whole training set, valX => train_set * 100/X where X in (0, 100) delete: bool Flag to delete all validation instances from the training set Returns ------- val: typing.List[str] List of instance-ids to validate on """ if val_set == "none": return None if val_set == "train": return self.scenario.train_insts elif val_set[:3] != "val": self.logger.warning("Can not determine validation set size. Using full training-set!") return self.scenario.train_insts size = int(val_set[3:]) / 100 if size <= 0 or size >= 1: raise ValueError("X invalid in valX, should be between 0 and 1") insts = np.array(self.scenario.train_insts) # just to make sure this also works with the small example we have to round up to 3 size = max(np.floor(insts.shape[0] * size).astype(int), 3) ids = np.random.choice(insts.shape[0], size, replace=False) val = insts[ids].tolist() if delete: self.scenario.train_insts = np.delete(insts, ids).tolist() return val
[docs] def optimize(self) -> typing.List[Configuration]: """ Optimizes the algorithm provided in scenario (given in constructor) Returns ------- portfolio : typing.List[Configuration] Portfolio of found configurations """ # Setup output directory self.portfolio = [] portfolio_cost = np.inf if self.output_dir is None: self.top_dir = "hydra-output_%s" % ( datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d_%H:%M:%S_%f") ) self.scenario.output_dir = os.path.join( self.top_dir, "psmac3-output_%s" % (datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d_%H:%M:%S_%f")), ) self.output_dir = create_output_directory(self.scenario, run_id=self.run_id, logger=self.logger) scen = copy.deepcopy(self.scenario) scen.output_dir_for_this_run = None scen.output_dir = None # parent process SMAC only used for validation purposes self.solver = SMAC4AC( scenario=scen, tae_runner=self._tae, rng=self.rng, run_id=self.run_id, **self.kwargs, tae_runner_kwargs=self._tae_kwargs, ) for i in range(self.n_iterations): self.logger.info("=" * 120) self.logger.info("Hydra Iteration: %d", (i + 1)) if i == 0: tae = self._tae tae_kwargs = self._tae_kwargs else: tae = ExecuteTARunHydra if self._tae_kwargs: tae_kwargs = self._tae_kwargs else: tae_kwargs = {} tae_kwargs["cost_oracle"] = self.cost_per_inst self.optimizer = PSMAC( scenario=self.scenario, run_id=self.run_id, rng=self.rng, tae=tae, tae_kwargs=tae_kwargs, shared_model=False, validate=True if self.val_set else False, n_optimizers=self.n_optimizers, val_set=self.val_set, n_incs=self.n_optimizers, # return all configurations (unvalidated) **self.kwargs, ) self.optimizer.output_dir = self.output_dir incs = self.optimizer.optimize() ( cost_per_conf_v, val_ids, cost_per_conf_e, est_ids, ) = self.optimizer.get_best_incumbents_ids(incs) if self.val_set: to_keep_ids = val_ids[: self.incs_per_round] else: to_keep_ids = est_ids[: self.incs_per_round] config_cost_per_inst = {} incs = incs[to_keep_ids] self.logger.info("Kept incumbents") for inc in incs: self.logger.info(inc) config_cost_per_inst[inc] = cost_per_conf_v[inc] if self.val_set else cost_per_conf_e[inc] cur_portfolio_cost = self._update_portfolio(incs, config_cost_per_inst) if portfolio_cost <= cur_portfolio_cost: self.logger.info("No further progress (%f) --- terminate hydra", portfolio_cost) break else: portfolio_cost = cur_portfolio_cost self.logger.info("Current pertfolio cost: %f", portfolio_cost) self.scenario.output_dir = os.path.join( self.top_dir, "psmac3-output_%s" % (datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d_%H:%M:%S_%f")), ) self.output_dir = create_output_directory(self.scenario, run_id=self.run_id, logger=self.logger) read( self.rh, os.path.join(self.top_dir, "psmac3*", "run_" + str(MAXINT)), self.scenario.cs, self.logger, ) self.rh.save_json(fn=os.path.join(self.top_dir, "all_validated_runs_runhistory.json"), save_external=True) with open(os.path.join(self.top_dir, "portfolio.pkl"), "wb") as fh: pickle.dump(self.portfolio, fh) self.logger.info("~" * 120) self.logger.info("Resulting Portfolio:") for configuration in self.portfolio: self.logger.info(str(configuration)) self.logger.info("~" * 120) return self.portfolio
def _update_portfolio(self, incs: np.ndarray, config_cost_per_inst: typing.Dict) -> typing.Union[float, float]: """ Validates all configurations (in incs) and determines which ones to add to the portfolio Parameters ---------- incs: np.ndarray List of Configurations Returns ------- cur_cost: typing.Union[float, float] The current cost of the portfolio """ if self.val_set: # we have validated data for kept in incs: if kept not in self.portfolio: self.portfolio.append(kept) cost_per_inst = config_cost_per_inst[kept] if self.cost_per_inst: if len(self.cost_per_inst) != len(cost_per_inst): raise ValueError("Num validated Instances mismatch!") else: for key in cost_per_inst: self.cost_per_inst[key] = min(self.cost_per_inst[key], cost_per_inst[key]) else: self.cost_per_inst = cost_per_inst cur_cost = np.mean(list(self.cost_per_inst.values())) # type: float else: # No validated data. Set the mean to the approximated mean means = [] # can contain nans as not every instance was evaluated thus we should use nanmean to approximate for kept in incs: means.append(np.nanmean(list(self.optimizer.rh.get_instance_costs_for_config(kept).values()))) self.portfolio.append(kept) if self.portfolio_cost: new_mean = self.portfolio_cost * (len(self.portfolio) - len(incs)) / len(self.portfolio) new_mean += np.nansum(means) else: new_mean = np.mean(means) self.cost_per_inst = defaultdict(lambda: new_mean) cur_cost = new_mean self.portfolio_cost = cur_cost return cur_cost