Source code for smac.utils.validate

from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast

import logging
import os
from collections import namedtuple

import numpy as np
from joblib import Parallel, delayed

from smac.configspace import Configuration, convert_configurations_to_array
from smac.epm.random_forest.rf_with_instances import RandomForestWithInstances
from smac.epm.random_forest.rfr_imputator import RFRImputator
from smac.epm.utils import get_types
from smac.runhistory.runhistory import RunHistory, RunInfo, RunKey, RunValue, StatusType
from smac.runhistory.runhistory2epm import RunHistory2EPM4Cost
from smac.scenario.scenario import Scenario
from smac.stats.stats import Stats
from smac.tae.base import BaseRunner
from smac.tae.execute_ta_run_old import ExecuteTARunOld
from smac.utils.constants import MAXINT

__author__ = "Joshua Marben"
__copyright__ = "Copyright 2017, ML4AAD"
__license__ = "3-clause BSD"
__maintainer__ = "Joshua Marben"
__email__ = "joshua.marben@neptun.uni-freiburg.de"


def _unbound_tae_starter(
    tae: BaseRunner,
    runhistory: Optional[RunHistory],
    run_info: RunInfo,
    *args: Any,
    **kwargs: Any,
) -> RunValue:
    """Unbound function to be used by joblibs Parallel, since directly passing the TAE results in
    pickling-problems.

    Parameters
    ----------
    tae: BaseRunner
        tae to be used
    runhistory: RunHistory
        runhistory to save
    run_info: RunInfo
        Config to be launched
    *args, **kwargs: various
        arguments to the tae

    Returns
    -------
    tae_results: RunValue
        return from tae.start
    """
    run_info, result = tae.run_wrapper(run_info)
    tae.stats.submitted_ta_runs += 1
    tae.stats.finished_ta_runs += 1
    tae.stats.ta_time_used += float(result.time)
    if runhistory:
        runhistory.add(
            config=run_info.config,
            cost=result.cost,
            time=result.time,
            status=result.status,
            instance_id=run_info.instance,
            seed=run_info.seed,
            budget=run_info.budget,
        )
        tae.stats.n_configs = len(runhistory.config_ids)

    return result


_Run = namedtuple("_Run", "config inst seed inst_specs")


[docs]class Validator(object): """Validator for the output of SMAC-scenarios. Evaluates specified configurations on specified instances. Parameters ---------- scenario: Scenario scenario object for cutoff, instances, features and specifics trajectory: trajectory-list trajectory to take incumbent(s) from rng: np.random.RandomState or int Random number generator or seed """ def __init__( self, scenario: Scenario, trajectory: Optional[List], rng: Union[np.random.RandomState, int, None] = None, ) -> None: self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.traj = trajectory self.scen = scenario self.epm = None # type: Optional[RandomForestWithInstances] if isinstance(rng, np.random.RandomState): self.rng = rng elif isinstance(rng, int): self.rng = np.random.RandomState(seed=rng) else: self.logger.debug("no seed given, using default seed of 1") num_run = 1 self.rng = np.random.RandomState(seed=num_run) def _save_results( self, rh: RunHistory, output_fn: Optional[str], backup_fn: Optional[str] = None, ) -> None: """Helper to save results to file. Parameters ---------- rh: RunHistory runhistory to save output_fn: str if ends on '.json': filename to save history to else: directory to save runhistory to (filename is backup_fn) backup_fn: str if output_fn does not end on '.json', treat output_fn as dir and append backup_fn as filename (if output_fn ends on '.json', this argument is ignored) """ if not output_fn: self.logger.info("No output specified, validated runhistory not saved.") return # Check if a folder or a file is specified as output if not output_fn.endswith(".json"): if backup_fn is None: raise ValueError("If output_fn does not end with .json the argument backup_fn needs to be given.") output_dir = output_fn output_fn = os.path.join(output_dir, backup_fn) self.logger.debug('Output is "%s", changing to "%s"!', output_dir, output_fn) base = os.path.split(output_fn)[0] if not base == "" and not os.path.exists(base): self.logger.debug('Folder ("%s") doesn\'t exist, creating.', base) os.makedirs(base) rh.save_json(output_fn) self.logger.info("Saving validation-results in %s", output_fn)
[docs] def validate( self, config_mode: Union[str, List[Configuration]] = "def", instance_mode: Union[str, List[str]] = "test", repetitions: int = 1, n_jobs: int = 1, backend: str = "threading", runhistory: Optional[RunHistory] = None, tae: BaseRunner = None, output_fn: Optional[str] = None, ) -> RunHistory: """Validate configs on instances and save result in runhistory. If a runhistory is provided as input it is important that you run it on the same/comparable hardware. side effect: if output is specified, saves runhistory to specified output directory. Parameters ---------- config_mode: str or list<Configuration> string or directly a list of Configuration. string from [def, inc, def+inc, wallclock_time, cpu_time, all]. time evaluates at cpu- or wallclock-timesteps of: [max_time/2^0, max_time/2^1, max_time/2^3, ..., default] with max_time being the highest recorded time instance_mode: str or list<str> what instances to use for validation, either from [train, test, train+test] or directly a list of instances repetitions: int number of repetitions in nondeterministic algorithms n_jobs: int number of parallel processes used by joblib backend: str what backend joblib should use for parallel runs runhistory: RunHistory optional, RunHistory-object to reuse runs tae: BaseRunner tae to be used. if None, will initialize ExecuteTARunOld output_fn: str path to runhistory to be saved. if the suffix is not '.json', will be interpreted as directory and filename will be 'validated_runhistory.json' Returns ------- runhistory: RunHistory runhistory with validated runs """ self.logger.debug( "Validating configs '%s' on instances '%s', repeating %d times" " with %d parallel runs on backend '%s'.", config_mode, instance_mode, repetitions, n_jobs, backend, ) # Get all runs to be evaluated as list runs, validated_rh = self._get_runs(config_mode, instance_mode, repetitions, runhistory) # Create new Stats without limits inf_scen = Scenario( { "run_obj": self.scen.run_obj, "cutoff_time": self.scen.cutoff, # type: ignore[attr-defined] # noqa F821 "output_dir": "", } ) inf_stats = Stats(inf_scen) inf_stats.start_timing() # Create TAE if not tae: tae = ExecuteTARunOld( ta=self.scen.ta, # type: ignore[attr-defined] # noqa F821 stats=inf_stats, run_obj=self.scen.run_obj, par_factor=self.scen.par_factor, # type: ignore[attr-defined] # noqa F821 cost_for_crash=self.scen.cost_for_crash, ) # type: ignore[attr-defined] # noqa F821 else: # Inject endless-stats tae.stats = inf_stats # Validate! run_results = self._validate_parallel(tae, runs, n_jobs, backend, runhistory) assert len(run_results) == len(runs), (run_results, runs) # tae returns (status, cost, runtime, additional_info) # Add runs to RunHistory for run, result in zip(runs, run_results): validated_rh.add( config=run.config, cost=result.cost, time=result.time, status=result.status, instance_id=run.inst, seed=run.seed, additional_info=result.additional_info, ) self._save_results(validated_rh, output_fn, backup_fn="validated_runhistory.json") return validated_rh
def _validate_parallel( self, tae: BaseRunner, runs: List[_Run], n_jobs: int, backend: str, runhistory: Optional[RunHistory] = None, ) -> List[RunValue]: """Validate runs with joblibs Parallel-interface. Parameters ---------- tae: BaseRunner tae to be used for validation runs: list<_Run> list with _Run-objects [_Run(config=CONFIG1,inst=INSTANCE1,seed=SEED1,inst_specs=INST_SPECIFICS1), ...] n_jobs: int number of cpus to use for validation (-1 to use all) backend: str what backend to use for parallelization runhistory: RunHistory optional, RunHistory-object to reuse runs Returns ------- run_results: list<tuple(tae-returns)> results as returned by tae """ # Runs with parallel run_results = Parallel(n_jobs=n_jobs, backend=backend)( delayed(_unbound_tae_starter)( tae, runhistory, RunInfo( config=run.config, instance=run.inst, instance_specific="0", seed=run.seed, cutoff=self.scen.cutoff, # type: ignore[attr-defined] # noqa F821 capped=False, budget=0, ), ) for run in runs ) return run_results
[docs] def validate_epm( self, config_mode: Union[str, List[Configuration]] = "def", instance_mode: Union[str, List[str]] = "test", repetitions: int = 1, runhistory: Optional[RunHistory] = None, output_fn: Optional[str] = None, reuse_epm: bool = True, ) -> RunHistory: """Use EPM to predict costs/runtimes for unknown config/inst-pairs. side effect: if output is specified, saves runhistory to specified output directory. Parameters ---------- output_fn: str path to runhistory to be saved. if the suffix is not '.json', will be interpreted as directory and filename will be 'validated_runhistory_EPM.json' config_mode: str or list<Configuration> string or directly a list of Configuration, string from [def, inc, def+inc, wallclock_time, cpu_time, all]. time evaluates at cpu- or wallclock-timesteps of: [max_time/2^0, max_time/2^1, max_time/2^3, ..., default] with max_time being the highest recorded time instance_mode: str or list<str> what instances to use for validation, either from [train, test, train+test] or directly a list of instances repetitions: int number of repetitions in nondeterministic algorithms runhistory: RunHistory optional, RunHistory-object to reuse runs reuse_epm: bool if true (and if `self.epm`), reuse epm to validate runs Returns ------- runhistory: RunHistory runhistory with predicted runs """ if not isinstance(runhistory, RunHistory) and (self.epm is None or not reuse_epm): raise ValueError("No runhistory specified for validating with EPM!") elif not reuse_epm or self.epm is None: # Create RandomForest types, bounds = get_types(self.scen.cs, self.scen.feature_array) # type: ignore[attr-defined] # noqa F821 epm = RandomForestWithInstances( configspace=self.scen.cs, # type: ignore[attr-defined] # noqa F821 types=types, bounds=bounds, instance_features=self.scen.feature_array, seed=self.rng.randint(MAXINT), ratio_features=1.0, ) # Use imputor if objective is runtime imputor = None impute_state = None impute_censored_data = False if self.scen.run_obj == "runtime": threshold = self.scen.cutoff * self.scen.par_factor # type: ignore[attr-defined] # noqa F821 imputor = RFRImputator( rng=self.rng, cutoff=self.scen.cutoff, # type: ignore[attr-defined] # noqa F821 threshold=threshold, model=epm, ) impute_censored_data = True impute_state = [StatusType.CAPPED] success_states = [ StatusType.SUCCESS, ] else: success_states = [StatusType.SUCCESS, StatusType.CRASHED, StatusType.MEMOUT] # Transform training data (from given rh) rh2epm = RunHistory2EPM4Cost( num_params=len(self.scen.cs.get_hyperparameters()), # type: ignore[attr-defined] # noqa F821 scenario=self.scen, rng=self.rng, impute_censored_data=impute_censored_data, imputor=imputor, impute_state=impute_state, success_states=success_states, ) assert runhistory is not None # please mypy X, y = rh2epm.transform(runhistory) self.logger.debug("Training model with data of shape X: %s, y:%s", str(X.shape), str(y.shape)) # Train random forest epm.train(X, y) else: epm = cast(RandomForestWithInstances, self.epm) # Predict desired runs runs, rh_epm = self._get_runs(config_mode, instance_mode, repetitions, runhistory) feature_array_size = len(self.scen.cs.get_hyperparameters()) # type: ignore[attr-defined] # noqa F821 if self.scen.feature_array is not None: feature_array_size += self.scen.feature_array.shape[1] X_pred = np.empty((len(runs), feature_array_size)) for idx, run in enumerate(runs): if self.scen.feature_array is not None and run.inst is not None: X_pred[idx] = np.hstack( [ convert_configurations_to_array([run.config])[0], self.scen.feature_dict[run.inst], # type: ignore ] ) else: X_pred[idx] = convert_configurations_to_array([run.config])[0] self.logger.debug("Predicting desired %d runs, data has shape %s", len(runs), str(X_pred.shape)) y_pred = epm.predict(X_pred) self.epm = epm # Add runs to runhistory for run, pred in zip(runs, y_pred[0]): rh_epm.add( config=run.config, cost=float(pred), time=float(pred), status=StatusType.SUCCESS, instance_id=run.inst, seed=-1, additional_info={"additional_info": "ESTIMATED USING EPM!"}, ) if output_fn: self._save_results(rh_epm, output_fn, backup_fn="validated_runhistory_EPM.json") return rh_epm
def _get_runs( self, configs: Union[str, List[Configuration]], insts: Union[str, List[str]], repetitions: int = 1, runhistory: RunHistory = None, ) -> Tuple[List[_Run], RunHistory]: """Generate list of SMAC-TAE runs to be executed. This means combinations of configs with all instances on a certain number of seeds. side effect: Adds runs that don't need to be reevaluated to self.rh! Parameters ---------- configs: str or list<Configuration> string or directly a list of Configuration str from [def, inc, def+inc, wallclock_time, cpu_time, all] time evaluates at cpu- or wallclock-timesteps of: [max_time/2^0, max_time/2^1, max_time/2^3, ..., default] with max_time being the highest recorded time insts: str or list<str> what instances to use for validation, either from [train, test, train+test] or directly a list of instances repetitions: int number of seeds per instance/config-pair to be evaluated runhistory: RunHistory optional, try to reuse this runhistory and save some runs Returns ------- runs: list<_Run> list with _Runs [_Run(config=CONFIG1,inst=INSTANCE1,seed=SEED1,inst_specs=INST_SPECIFICS1), _Run(config=CONFIG2,inst=INSTANCE2,seed=SEED2,inst_specs=INST_SPECIFICS2), ...] """ # Get relevant configurations and instances if isinstance(configs, str): configs = self._get_configs(configs) if isinstance(insts, str): instances = sorted(self._get_instances(insts)) # type: Sequence[Union[str, None]] elif insts is not None: instances = sorted(insts) else: instances = [None] # If no instances are given, fix the instances to one "None" instance if not instances: instances = [None] # If algorithm is deterministic, fix repetitions to 1 if self.scen.deterministic and repetitions != 1: # type: ignore[attr-defined] # noqa F821 self.logger.warning( "Specified %d repetitions, but fixing to 1, " "because algorithm is deterministic.", repetitions, ) repetitions = 1 # Extract relevant information from given runhistory inst_seed_config = self._process_runhistory(configs, instances, runhistory) # Now create the actual run-list runs = [] # Counter for runs without the need of recalculation runs_from_rh = 0 # If we reuse runs, we want to return them as well new_rh = RunHistory() for i in instances: for rep in range(repetitions): # First, find a seed and add all the data we can take from the # given runhistory to "our" validation runhistory. configs_evaluated = [] # type: Configuration if runhistory and i in inst_seed_config: # Choose seed based on most often evaluated inst-seed-pair seed, configs_evaluated = inst_seed_config[i].pop(0) # Delete inst if all seeds are used if not inst_seed_config[i]: inst_seed_config.pop(i) # Add runs to runhistory for c in configs_evaluated[:]: runkey = RunKey(runhistory.config_ids[c], i, seed) cost, time, status, start, end, additional_info = runhistory.data[runkey] if status in [StatusType.CRASHED, StatusType.ABORT, StatusType.CAPPED]: # Not properly executed target algorithm runs should be repeated configs_evaluated.remove(c) continue new_rh.add( c, cost, time, status, instance_id=i, seed=seed, starttime=start, endtime=end, additional_info=additional_info, ) runs_from_rh += 1 else: # If no runhistory or no entries for instance, get new seed seed = self.rng.randint(MAXINT) # We now have a seed and add all configs that are not already # evaluated on that seed to the runs-list. This way, we # guarantee the same inst-seed-pairs for all configs. for config in [c for c in configs if c not in configs_evaluated]: # Only use specifics if specific exists, else use string "0" specs = self.scen.instance_specific[i] if i and i in self.scen.instance_specific else "0" runs.append(_Run(config=config, inst=i, seed=seed, inst_specs=specs)) self.logger.info( "Collected %d runs from %d configurations on %d " "instances with %d repetitions. Reusing %d runs from " "given runhistory.", len(runs), len(configs), len(instances), repetitions, runs_from_rh, ) return runs, new_rh def _process_runhistory( self, configs: List[Configuration], insts: Sequence[Optional[str]], runhistory: Optional[RunHistory], ) -> Dict[str, List[Tuple[int, List[Configuration]]]]: """Processes runhistory from self._get_runs by extracting already evaluated (relevant) config-inst-seed tuples. Parameters ---------- configs: list(Configuration) list of configs of interest insts: list(str) list of instances of interest runhistory: RunHistory runhistory to extract runs from Returns ------- inst_seed_config: dict<str : list(tuple(int, tuple(configs)))> dictionary mapping instances to a list of tuples of already used seeds and the configs that this inst-seed-pair has been evaluated on, sorted by the number of configs """ # We want to reuse seeds that have been used on most configurations # To this end, we create a dictionary as {instances:{seed:[configs]}} # Like this we can easily retrieve the most used instance-seed pairs to # minimize the number of runs to be evaluated if runhistory: inst_seed_config = {} # type: Dict[str, Dict[int, List[Configuration]]] relevant = dict() for key in runhistory.data: if runhistory.ids_config[key.config_id] in configs and key.instance_id in insts: relevant[key] = runhistory.data[key] # Change data-structure to {instances:[(seed1, (configs)), (seed2, (configs), ... ]} # to make most used seed easily accessible, we sort after length of configs for key in relevant: inst, seed = key.instance_id, key.seed config = runhistory.ids_config[key.config_id] if inst in inst_seed_config: if seed in inst_seed_config[inst]: inst_seed_config[inst][seed].append(config) else: inst_seed_config[inst][seed] = [config] else: inst_seed_config[inst] = {seed: [config]} return { i: sorted( [(seed, list(inst_seed_config[i][seed])) for seed in inst_seed_config[i]], key=lambda x: len(x[1]), ) for i in inst_seed_config } else: rval = {} # type: Dict[str, List[Tuple[int, List[Configuration]]]] return rval def _get_configs(self, mode: str) -> List[str]: """Return desired configs. Parameters ---------- mode: str str from [def, inc, def+inc, wallclock_time, cpu_time, all] time evaluates at cpu- or wallclock-timesteps of: [max_time/2^0, max_time/2^1, max_time/2^3, ..., default] with max_time being the highest recorded time Returns ------- configs: list<Configuration> list with desired configurations """ # Get trajectory and make sure it's not None to please mypy traj = self.traj assert traj is not None # please mypy # Add desired configs configs = [] mode = mode.lower() if mode not in ["def", "inc", "def+inc", "wallclock_time", "cpu_time", "all"]: raise ValueError("%s not a valid option for config_mode in validation." % mode) if mode == "def" or mode == "def+inc": configs.append(self.scen.cs.get_default_configuration()) # type: ignore[attr-defined] # noqa F821 if mode == "inc" or mode == "def+inc": configs.append(traj[-1]["incumbent"]) if mode in ["wallclock_time", "cpu_time"]: # get highest time-entry and add entries from there # not using wallclock_limit in case it's inf if mode == "wallclock_time" and np.isfinite(self.scen.wallclock_limit): max_time = self.scen.wallclock_limit elif mode == "cpu_time" and np.isfinite(self.scen.algo_runs_timelimit): max_time = self.scen.algo_runs_timelimit else: max_time = traj[-1][mode] counter = 2**0 for entry in traj[::-1]: if entry[mode] <= max_time / counter and entry["incumbent"] not in configs: configs.append(entry["incumbent"]) counter *= 2 if not traj[0]["incumbent"] in configs: configs.append(traj[0]["incumbent"]) # add first if mode == "all": for entry in traj: if not entry["incumbent"] in configs: configs.append(entry["incumbent"]) self.logger.debug("Gathered %d configurations for mode %s.", len(configs), mode) return configs def _get_instances(self, mode: str) -> List[str]: """Get desired instances. Parameters ---------- mode: str what instances to use for validation, from [train, test, train+test] Returns ------- instances: list<str> instances to be used """ instance_mode = mode.lower() if mode not in ["train", "test", "train+test"]: raise ValueError("%s not a valid option for instance_mode in validation." % mode) # Make sure if instances matter, than instances should be passed if (instance_mode == "train" and self.scen.train_insts == [None]) or ( instance_mode == "test" and self.scen.test_insts == [None] ): self.logger.warning( "Instance mode is set to %s, but there are no " "%s-instances specified in the scenario. Setting instance mode to" '"train+test"!', instance_mode, instance_mode, ) instance_mode = "train+test" instances = [] # type: List[str] if (instance_mode == "train" or instance_mode == "train+test") and not self.scen.train_insts == [None]: instances.extend(self.scen.train_insts) if (instance_mode == "test" or instance_mode == "train+test") and not self.scen.test_insts == [None]: instances.extend(self.scen.test_insts) return instances