Source code for dacbench.envs.luby

"""Luby environment from
"Dynamic Algorithm Configuration:Foundation of a New Meta-Algorithmic Framework"
by A. Biedenkapp and H. F. Bozkurt and T. Eimer and F. Hutter and M. Lindauer.
Original environment authors: André Biedenkapp, H. Furkan Bozkurt.
"""

from __future__ import annotations

from dataclasses import dataclass

import numpy as np

from dacbench import AbstractEnv


[docs] @dataclass class LubyInstance: """Luby Instance.""" start_shift: float sticky_shift: float
[docs] class LubyEnv(AbstractEnv): """Environment to learn Luby Sequence.""" def __init__(self, config) -> None: """Initialize Luby Env. Parameters ---------- config : objdict Environment configuration """ super().__init__(config) self._hist_len = config["hist_length"] self._ms = self.n_steps self._mi = config["min_steps"] self._state = np.array([-1 for _ in range(self._hist_len + 1)]) self._r = 0 self._genny = luby_gen(1) self._next_goal = next(self._genny) # Generate luby sequence up to 2*max_steps + 2 as mode 1 could potentially # shift up to max_steps self._seq = np.log2( [next(luby_gen(i)) for i in range(1, 2 * config["cutoff"] + 2)] ) self._jenny_i = 1 self._start_dist = None self._sticky_dis = None self._sticky_shif = 0 self._start_shift = 0 self.__lower, self.__upper = 0, 0 self.__error = 0 self.done = None self.action = None self.get_reward = config.get("reward_function", self.get_default_reward) self.get_state = config.get("state_method", self.get_default_state)
[docs] def step(self, action: int): """Execute environment step. Parameters ---------- action : int action to execute Returns: -------- np.array, float, bool, bool, dict: state, reward, terminated, truncated, info """ self.done = super().step_() self.prev_state = self._state.copy() self.action = action reward = self.get_reward(self) if ( self.__error < self.__lower ): # needed to avoid too long sequences of sticky actions self.__error += np.abs(self.__lower) elif self.__error > self.__upper: self.__error -= np.abs(self.__upper) self._jenny_i += 1 self.__error += self._sticky_shif # next target in sequence at step luby_t is determined by the current time step # (jenny_i), the start_shift value and the sticky error. Additive sticky error # leads to sometimes rounding to the next time_step and thereby repeated # actions. With check against lower/upper we reset the sequence to the correct # timestep in the t+1 timestep. luby_t = max(1, int(np.round(self._jenny_i + self._start_shift + self.__error))) self._next_goal = self._seq[luby_t - 1] return self.get_state(self), reward, False, self.done, {}
[docs] def reset(self, seed=None, options=None) -> list[int]: """Resets env. Returns: -------- numpy.array: Environment state """ if options is None: options = {} super().reset_(seed) self._start_shift = self.instance.start_shift self._sticky_shif = self.instance.sticky_shift self._r = 0 self.n_steps = self._mi self.__error = 0 + self._sticky_shif self._jenny_i = 1 luby_t = max(1, int(np.round(self._jenny_i + self._start_shift + self.__error))) self._next_goal = self._seq[luby_t - 1] self.done = False return self.get_state(self), {}
[docs] def get_default_reward(self, _): """The default reward function. Args: _ (_type_): Empty parameter, which can be used when overriding Returns: float: The calculated reward """ if self.action == self._next_goal: # we don't want to allow for exploiting large rewards # by tending towards long sequences self._r = 0 else: # mean and var chosen s.t. ~1/4 of rewards are positive self._r = -1 self._r = max(self.reward_range[0], min(self.reward_range[1], self._r)) return self._r
[docs] def get_default_state(self, _): """Default state function. Args: _ (_type_): Empty parameter, which can be used when overriding Returns: -------- dict: The current state """ if self.c_step == 0: self._state = [-1 for _ in range(self._hist_len + 1)] else: if self.c_step - 1 < self._hist_len: self._state[(self.c_step - 1)] = self.action else: self._state[:-2] = self._state[1:-1] self._state[-2] = self.action self._state[-1] = self.c_step - 1 return np.array(self._state if not self.done else self.prev_state)
[docs] def close(self) -> bool: """Close Env. Returns: -------- bool: Closing confirmation """ return True
[docs] def render(self, mode: str = "human") -> None: """Render env in human mode. Parameters ---------- mode : str Execution mode """ if mode != "human": raise NotImplementedError
[docs] def luby_gen(i): """Generator for the Luby Sequence.""" for k in range(1, 33): if i == ((1 << k) - 1): yield 1 << (k - 1) for k in range(1, 9999): if 1 << (k - 1) <= i < (1 << k) - 1: yield from luby_gen(i - (1 << (k - 1)) + 1)