Source code for smac.facade.psmac_facade

# type: ignore
# mypy: ignore-errors

from typing import Any, Dict, List, Optional, Type, Union

import copy
import datetime
import logging
import os
import time
from pathlib import Path

import joblib
import numpy as np
from ConfigSpace.configuration_space import Configuration

from smac.epm.utils import get_rng
from smac.facade.smac_ac_facade import SMAC4AC
from smac.runhistory.runhistory import RunHistory
from smac.scenario.scenario import Scenario
from smac.tae.base import BaseRunner
from smac.tae.execute_ta_run_hydra import ExecuteTARunOld
from smac.utils.io.output_directory import create_output_directory
from smac.utils.io.result_merging import ResultMerger

__author__ = "Andre Biedenkapp"
__copyright__ = "Copyright 2018, ML4AAD"
__license__ = "3-clause BSD"


[docs]def optimize( scenario: Scenario, tae_runner: Type[BaseRunner], tae_runner_kwargs: Dict, rng: Union[np.random.RandomState, int], output_dir: str, facade_class: Optional[Type[SMAC4AC]] = None, **kwargs, ) -> Configuration: """ Unbound method to be called in a subprocess Parameters ---------- scenario: Scenario smac.Scenario to initialize SMAC tae_runner: BaseRunner Target Algorithm Runner (supports old and aclib format) tae_runner_kwargs: Optional[dict] arguments passed to constructor of '~tae' rng: int/np.random.RandomState The randomState/seed to pass to each smac run output_dir: str The directory in which each smac run should write its results Returns ------- incumbent: Configuration The incumbent configuration of this run """ if facade_class is None: facade_class = SMAC4AC solver = facade_class( scenario=scenario, tae_runner=tae_runner, tae_runner_kwargs=tae_runner_kwargs, rng=rng, **kwargs ) solver.stats.start_timing() solver.stats.print_stats() incumbent = solver.solver.run() solver.stats.print_stats() if output_dir is not None: solver.solver.runhistory.save_json(fn=os.path.join(solver.output_dir, "runhistory.json")) return incumbent
[docs]class PSMAC(object): """ Facade to use pSMAC [1]_ With pSMAC you can either run n distinct SMAC optimizations in parallel (`shared_model=False`) or you can parallelize the target algorithm evaluations (`shared_model=True`). In the latter case all SMAC workers share one file directory and communicate via the logfiles. You can specify the number of SMAC workers/optimizers with the argument `n_workers`. You can pass all other kwargs for the SMAC4AC facade. In addition, you can access the facade's attributes normally (e.g. smac.stats), however each time a new SMAC object is built, reading the information from the file system. .. [1] Ramage, S. E. A. (2015). Advances in meta-algorithmic software libraries for distributed automated algorithm configuration (T). University of British Columbia. Retrieved from https://open.library.ubc.ca/collections/ubctheses/24/items/1.0167184. Parameters ---------- scenario : ~smac.scenario.scenario.Scenario Scenario object. Note that the budget/number of evaluations (runcount-limit) is used for each worker. So if you specify 40 evaluations and 3 workers, 120 configurations will be evaluated in total. n_workers: int Number of optimizers to run in parallel per round rng: int/np.random.RandomState The randomState/seed to pass to each smac run run_id: int run_id for this hydra run tae_runner: BaseRunner Target Algorithm Runner (supports old and aclib format as well as AbstractTAFunc) tae_runner_kwargs: Optional[dict] arguments passed to constructor of '~tae_runner' shared_model: bool Flag to indicate whether information is shared between SMAC runs or not validate: bool / None Flag to indicate whether to validate the found configurations or to use the SMAC estimates None => neither and return the full portfolio val_set: List[str] List of instance-ids to validate on **kwargs Keyword arguments for the SMAC4AC facade Attributes ---------- # TODO update attributes logger stats : Stats loggs information about used resources solver : SMBO handles the actual algorithm calls rh : RunHistory List with information about previous runs portfolio : list List of all incumbents """ def __init__( self, scenario: Scenario, rng: Optional[Union[np.random.RandomState, int]] = None, run_id: int = 1, tae_runner: Type[BaseRunner] = ExecuteTARunOld, tae_runner_kwargs: Union[dict, None] = None, shared_model: bool = True, facade_class: Optional[Type[SMAC4AC]] = None, validate: bool = True, n_workers: int = 2, val_set: Union[List[str], None] = None, **kwargs, ): self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.scenario = scenario self.run_id, self.rng = get_rng(rng, run_id, logger=self.logger) self.kwargs = kwargs self.output_dir = None if facade_class is None: facade_class = SMAC4AC self.facade_class = facade_class self.rh = RunHistory() self._tae_runner = tae_runner self._tae_runner_kwargs = tae_runner_kwargs if n_workers <= 1: self.logger.warning("Invalid value in %s: %d. Setting to 2", "n_workers", n_workers) self.n_workers = max(n_workers, 2) self.seeds = np.arange(0, self.n_workers, dtype=int) # seeds for the parallel runs self.validate = validate self.shared_model = shared_model if val_set is None: self.val_set = scenario.train_insts else: self.val_set = val_set self.result_merger: Optional[ResultMerger] = None self.n_incs: int = 1
[docs] def optimize(self) -> Configuration: """ Optimizes the algorithm provided in scenario (given in constructor) Returns ------- incumbent : Configuration Best configuration across all workers. """ # Setup output directory if self.output_dir is None: self.scenario.output_dir = "psmac3-output_%s" % ( datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d_%H:%M:%S_%f") ) self.output_dir = create_output_directory(self.scenario, run_id=self.run_id, logger=self.logger) if self.shared_model: self.scenario.shared_model = self.shared_model if self.scenario.input_psmac_dirs is None: self.scenario.input_psmac_dirs = os.path.sep.join((self.scenario.output_dir, "run_*")) scen = copy.deepcopy(self.scenario) scen.output_dir_for_this_run = None scen.output_dir = None self.logger.info("+" * 120) self.logger.info("PSMAC run") incs = joblib.Parallel(n_jobs=self.n_workers)( joblib.delayed(optimize)( scenario=self.scenario, # Scenario object tae_runner=self._tae_runner, # type of tae_runner to run target with tae_runner_kwargs=self._tae_runner_kwargs, rng=int(seed), # seed for the rng/run_id output_dir=self.output_dir, # directory to create outputs in facade_class=self.facade_class, **self.kwargs, ) for seed in self.seeds ) inc = self.get_best_incumbent(incs=incs) return inc
[docs] def get_best_incumbent(self, incs: List[Configuration]) -> Configuration: """ Determine ID and cost of best configuration (incumbent). Parameters ---------- incs : List[Configuration] List of incumbents from the workers. Returns ------- Configuration Best incumbent from all workers. """ if self.validate is True: mean_costs_conf_valid, cost_per_config_valid = self.validate_incs(incs) val_id = list(map(lambda x: x[0], sorted(enumerate(mean_costs_conf_valid), key=lambda y: y[1])))[0] inc = incs[val_id] else: mean_costs_conf_estimate, cost_per_config_estimate = self._get_mean_costs(incs, self.rh) est_id = list(map(lambda x: x[0], sorted(enumerate(mean_costs_conf_estimate), key=lambda y: y[1])))[0] inc = incs[est_id] return inc
def _get_mean_costs(self, incs: List[Configuration], new_rh: RunHistory): """ Compute mean cost per instance Parameters ---------- incs : List[Configuration] incumbents determined by all parallel SMAC runs new_rh : RunHistory runhistory to determine mean performance Returns ------- List[float] means Dict(Config -> Dict(inst_id(str) -> float)) """ config_cost_per_inst = {} results = [] for incumbent in incs: cost_per_inst = new_rh.get_instance_costs_for_config(config=incumbent) config_cost_per_inst[incumbent] = cost_per_inst values = list(cost_per_inst.values()) if values: results.append(np.mean(values)) else: results.append(np.nan) return results, config_cost_per_inst def _get_solver(self): # TODO: specify one output dir or no output dir solver = self.facade_class(scenario=self.scenario, rng=self.rng, run_id=None, **self.kwargs) return solver
[docs] def validate_incs(self, incs: List[Configuration]): """Validation of the incumbents.""" solver = self._get_solver() self.logger.info("*" * 120) self.logger.info("Validating") new_rh = solver.validate( config_mode=incs, instance_mode=self.val_set, repetitions=1, use_epm=False, n_jobs=self.n_workers, ) return self._get_mean_costs(incs, new_rh)
[docs] def write_run(self) -> None: """ Write all SMAC files to pSMAC dir Returns ------- None """ # write runhistory # write configspace file .pcs .json # write trajectory traj.json # write scenario .txt # write stats raise NotImplementedError
def _check_result_merger(self): if self.result_merger is None: if self.output_dir is None: raise ValueError( "Cannot instantiate `ResultMerger` because `output_dir` " "is None. In pSMAC `output_dir` is set after " "`optimize()` has been called. If you already have " "a pSMAC run or rundirs, please directly use " "`smac.utils.io.result_merging.ResultMerger`." ) self.result_merger = ResultMerger(output_dir=Path(self.output_dir).parent)
[docs] def get_runhistory(self) -> Optional[RunHistory]: """ Get merged runhistory from pSMAC workers Returns ------- Optional[RunHistory] """ self._check_result_merger() return self.result_merger.get_runhistory()
[docs] def get_trajectory(self) -> Optional[List[Dict[str, Any]]]: """ Get trajectory from merged runhistory Returns ------- Optional[List[Dict[str, Any]]] """ self._check_result_merger() return self.result_merger.get_trajectory()