"""Theory Environment."""
from __future__ import annotations
import logging
import uuid
from collections import deque
from copy import deepcopy
import gymnasium as gym
import numpy as np
from dacbench import AbstractEnv
[docs]
class BinaryProblem:
"""An abstract class for an individual in binary representation."""
def __init__(self, n, rng=None):
"""Init problem."""
if rng is None:
rng = np.random.default_rng()
self.data = rng.choice([True, False], size=n)
self.n = n
self.fitness = self.eval()
[docs]
def initialise_with_fixed_number_of_bits(self, k, rng=None):
"""Init with given number of bits."""
if rng is None:
rng = np.random.default_rng()
nbits = self.data.sum()
if nbits < k:
ids = rng.choice(
np.where(self.data is False)[0], size=k - nbits, replace=False
)
self.data[ids] = True
self.eval()
[docs]
def is_optimal(self):
"""Get is_optimal flag."""
[docs]
def get_optimal(self):
"""Get optimum."""
[docs]
def eval(self):
"""Evaluate fitness."""
[docs]
def get_fitness_after_flipping(self, locs):
"""Calculate the change in fitness after flipping the bits at positions locs.
Parameters
----------
locs: 1d-array
positions where bits are flipped
Returns:
-------
objective after flipping
"""
raise NotImplementedError
[docs]
def get_fitness_after_crossover(self, xprime, locs_x, locs_xprime):
"""Calculate fitness of the child aftering being crossovered with xprime.
Parameters
----------
xprime: 1d boolean array
the individual to crossover with
locs_x: 1d boolean/integer array
positions where we keep current bits of self
locs_xprime: : 1d boolean/integer array
positions where we change to xprime's bits
Returns:
-------
fitness of the new individual after crossover
"""
raise NotImplementedError
[docs]
def flip(self, locs):
"""Flip the bits at position indicated by locs.
Parameters
----------
locs: 1d-array
positions where bits are flipped
Returns:
-------
the new individual after the flip
"""
child = deepcopy(self)
child.data[locs] = ~child.data[locs]
child.eval()
return child
[docs]
def combine(self, xprime, locs_xprime):
"""Combine (crossover) self and xprime by taking xprime's bits at locs_xprime
and self's bits at other positions.
Parameters
----------
xprime: 1d boolean array
the individual to crossover with
locs_x: 1d boolean/integer array
positions where we keep current bits of self
locs_xprime: : 1d boolean/integer array
positions where we change to xprime's bits
Returns:
-------
the new individual after the crossover
"""
child = deepcopy(self)
child.data[locs_xprime] = xprime.data[locs_xprime]
child.eval()
return child
[docs]
def mutate(self, p, n_childs, rng=None):
"""Draw l ~ binomial(n, p), l>0.
Generate n_childs children by flipping exactly l bits
Returns:
-------
the best child (maximum fitness), its fitness and number of evaluations used
"""
if rng is None:
rng = np.random.default_rng()
assert p >= 0
if p == 0:
return self, self.fitness, 0
length = 0
while length == 0:
length = rng.binomial(self.n, p)
best_obj = -1
best_locs = None
for _i in range(n_childs):
locs = rng.choice(self.n, size=length, replace=False)
obj = self.get_fitness_after_flipping(locs)
if obj > best_obj:
best_locs = locs
best_obj = obj
best_child = self.flip(best_locs)
return best_child, best_child.fitness, n_childs
[docs]
def mutate_rls(self, length, rng=None):
"""Generate a child by flipping exactly l bits.
Returns:
-------
child, its fitness
"""
if rng is None:
rng = np.random.default_rng()
assert length >= 0
if length == 0:
return self, self.fitness, 0
locs = rng.choice(self.n, size=length, replace=False)
child = self.flip(locs)
return child, child.fitness, 1
[docs]
def crossover(
self,
xprime,
p,
n_childs,
include_xprime=True,
count_different_inds_only=True,
rng=None,
):
"""Crossover operation in population.
Crossover operator: for each bit, taking value from x with probability p
and from self with probability 1-p
Parameters
----------
xprime
the individual to crossover with
p : float
probability in [0,1]
n_childs : int
number of child individuals
include_xprime : bool
whether to inculde x
count_different_inds_only : bool
whether to only count different individuals
rng:
random number generator
"""
if rng is None:
rng = np.random.default_rng()
assert p <= 1
if p == 0:
if include_xprime:
return xprime, xprime.fitness, 0
return self, self.fitness, 0
best_obj = xprime.fitness if include_xprime else -1
best_locs = None
n_evals = 0
ls = rng.binomial(self.n, p, size=n_childs)
for l in ls: # noqa: E741
locs_xprime = rng.choice(self.n, l, replace=False)
locs_x = np.full(self.n, True)
locs_x[locs_xprime] = False
obj = self.get_fitness_after_crossover(xprime, locs_x, locs_xprime)
if (
obj not in (self.fitness, xprime.fitness)
or (
not np.array_equal(xprime.data[locs_xprime], self.data[locs_xprime])
)
and (not np.array_equal(self.data[locs_x], xprime.data[locs_x]))
):
n_evals += 1
if obj > best_obj:
best_obj = obj
best_locs = locs_xprime
child = self.combine(xprime, best_locs) if best_locs is not None else xprime
if not count_different_inds_only:
n_evals = n_childs
return child, child.fitness, n_evals
[docs]
class LeadingOne(BinaryProblem):
"""An individual for LeadingOne problem.
The aim is to maximise the number of leading (and consecutive) 1 bits in the string
"""
def __init__(self, n, rng=None, initObj=None): # noqa: N803
"""Make individual."""
if rng is None:
rng = np.random.default_rng()
if initObj is None:
super().__init__(n=n, rng=rng)
else:
self.data = rng.choice([True, False], size=n)
self.data[: int(initObj)] = True
self.data[int(initObj)] = False
self.n = n
self.fitness = self.eval()
[docs]
def eval(self):
"""Evaluate fitness."""
k = self.data.argmin()
if self.data[k]:
self.fitness = self.n
else:
self.fitness = k
return self.fitness
[docs]
def is_optimal(self):
"""Return is_optimal flag."""
return self.data.all()
[docs]
def get_optimal(self):
"""Return optimum."""
return self.n
[docs]
def get_fitness_after_flipping(self, locs):
"""Return fitness after flipping."""
min_loc = locs.min()
if min_loc < self.fitness:
return min_loc
if min_loc > self.fitness:
return self.fitness
old_fitness = self.fitness
self.data[locs] = ~self.data[locs]
new_fitness = self.eval()
self.data[locs] = ~self.data[locs]
self.fitness = old_fitness
return new_fitness
[docs]
def get_fitness_after_crossover(self, xprime, locs_x, locs_xprime):
"""Return fitness after crossover."""
child = self.combine(xprime, locs_xprime)
child.eval()
return child.fitness
MAX_INT = 1e8
HISTORY_LENGTH = 5
[docs]
class TheoryEnv(AbstractEnv):
"""Environment for RLS with step size.
Current assumption: we only consider (1+1)-RLS,
so there's only one parameter to tune (r)
"""
def __init__(self, config, test_env=False) -> None:
"""Initialize TheoryEnv.
Parameters
----------
config : objdict
Environment configuration
test_env : bool
whether to use test mode
"""
super().__init__(config)
self.logger = logging.getLogger(self.__str__())
self.test_env = test_env
self.name = config.name
# name of reward function
assert config.reward_choice in [
"imp_div_evals",
"imp_div_evals_new",
"imp_minus_evals",
"minus_evals",
"imp",
"minus_evals_normalised",
"imp_minus_evals_normalised",
]
self.reward_choice = config.reward_choice
# print("Reward choice: " + self.reward_choice)
# get problem
self.problem = globals()[config.problem]
# read names of all observation variables
self.obs_description = config.observation_description
self.obs_var_names = [
s.strip() for s in config.observation_description.split(",")
]
# functions to get values of the current state from histories
# (see reset() function for those history variables)
self.state_functions = []
for var_name in self.obs_var_names:
if var_name == "n":
self.state_functions.append(lambda: self.n)
elif var_name in ["r"]:
self.state_functions.append(
lambda his="history_" + var_name: vars(self)[his][-1]
)
elif (
"_{t-" in var_name
): # TODO: this implementation only allow accessing history of r,
# but not delta_f(x), optimal_k, etc
k = int(
var_name.split("_{t-")[1][:-1]
) # get the number in _{t-<number>}
name = var_name.split("_{t-")[0] # get the variable name (r, f(x), etc)
self.state_functions.append(
lambda k=k, his="history_" + name: vars(self)[his][-(k + 1)]
) # the last element is the value at the current time step,
# so we have to go one step back to access the history
elif var_name == "f(x)":
self.state_functions.append(lambda: self.history_fx[-1])
elif var_name == "delta_f(x)":
self.state_functions.append(
lambda: self.history_fx[-1] - self.history_fx[-2]
)
elif var_name == "optimal_r":
self.state_functions.append(
lambda: int(self.n / (self.history_fx[-1] + 1))
)
else:
raise Exception("Error: invalid state variable name: " + var_name)
# the random generator used by RLS
seed = config.seed if "seed" in config else None
if "seed" in self.instance:
seed = self.instance.seed
self.seed(seed)
# for logging
self.outdir = None
if "outdir" in config:
self.outdir = config.outdir + "/" + str(uuid.uuid4())
[docs]
@staticmethod
def get_obs_domain_from_name(var_name): # noqa: ARG004
"""Get default lower and upperbound of a observation variable based on its name.
The observation space will then be created
Returns:
--------
Two int values, e.g., 1, np.inf
"""
return 0, np.inf
[docs]
def reset(self, seed=None, options=None):
"""Resets env.
Returns:
--------
numpy.array: Environment state
"""
if options is None:
options = {}
super().reset_(seed)
# current problem size (n) & evaluation limit (max_evals)
self.n = self.instance.size
if self.test_env:
self.max_evals = self.n_steps
else:
self.max_evals = int(0.8 * self.n * self.n)
self.logger.info("n:%d, max_evals:%d" % (self.n, self.max_evals))
# set random seed
if "seed" in self.instance:
self.seed(self.instance.seed)
# create an initial solution
if self.instance.initObj == "random":
self.x = self.problem(n=self.instance.size, rng=self.np_random)
else:
self.x = self.problem(
n=self.instance.size, rng=self.np_random, initObj=self.instance.initObj
)
# total number of evaluations so far
self.total_evals = 1
# reset histories
self.history_r = deque([0] * HISTORY_LENGTH, maxlen=HISTORY_LENGTH)
self.history_fx = deque(
[self.x.fitness] * HISTORY_LENGTH, maxlen=HISTORY_LENGTH
)
# for debug only
self.log_r = []
self.log_reward = []
self.log_fx = []
self.init_obj = self.x.fitness
return self.get_state(), {}
[docs]
def get_state(self):
"""Return state."""
return np.asarray([f() for f in self.state_functions])
[docs]
def step(self, action):
"""Execute environment step.
Parameters
----------
action : Box
action to execute
Returns:
--------
np.array, float, bool, bool, dict: state, reward, terminated, truncated, info
"""
truncated = super().step_()
fitness_before_update = self.x.fitness
# get r
if isinstance(action, list | np.ndarray):
assert len(action) == 1
r = action[0]
else:
r = action
# if r is out of range
stop = False
if r < 1 or r > self.n:
self.logger.info(f"WARNING: r={r} is out of bound")
# if we're in the training phase, we return a large negative reward
# and stop the episode
if self.test_env is False:
terminated = True
n_evals = 0
reward = -MAX_INT
stop = True
# if we're in the test phase, just clip r back to the range and continue
else:
r = np.clip(r, 1, self.n)
if stop is False:
# flip r bits
r = int(r)
y, f_y, n_evals = self.x.mutate_rls(r, self.np_random)
# update x
if self.x.fitness <= y.fitness:
self.x = y
# update total number of evaluations
self.total_evals += n_evals
# check stopping criteria
terminated = (self.total_evals >= self.max_evals) or (self.x.is_optimal())
# calculate reward
if self.reward_choice == "imp_div_evals":
reward = (self.x.fitness - fitness_before_update - 0.5) / n_evals
elif self.reward_choice == "imp_minus_evals":
reward = self.x.fitness - fitness_before_update - n_evals
elif self.reward_choice == "minus_evals":
reward = -n_evals
elif self.reward_choice == "minus_evals_normalised":
reward = -n_evals / self.max_evals
elif self.reward_choice == "imp_minus_evals_normalised":
reward = (
self.x.fitness - fitness_before_update - n_evals
) / self.max_evals
elif self.reward_choice == "imp":
reward = self.x.fitness - fitness_before_update - 0.5
self.log_reward.append(reward)
# update histories
self.history_fx.append(self.x.fitness)
self.history_r.append(r)
# update logs
self.log_r.append(r)
self.log_fx.append(self.x.fitness)
self.log_reward.append(reward)
returned_info = {"msg": "", "values": {}}
if terminated or truncated:
msg = "Env " + self.env_type + ". " if hasattr(self, "env_type") else ""
msg += (
"Episode done: n=%d; obj=%d; init_obj=%d; evals=%d; max_evals=%d; "
"steps=%d; r_min=%.1f; r_max=%.1f; r_mean=%.1f; R=%.4f"
) % (
self.n,
self.x.fitness,
self.init_obj,
self.total_evals,
self.max_evals,
self.c_step,
min(self.log_r),
max(self.log_r),
sum(self.log_r) / len(self.log_r),
sum(self.log_reward),
)
# self.logger.info(msg)
returned_info["msg"] = msg
returned_info["values"] = {
"n": int(self.n),
"obj": int(self.x.fitness),
"init_obj": int(self.init_obj),
"evals": int(self.total_evals),
"max_evals": int(self.max_evals),
"steps": int(self.c_step),
"r_min": float(min(self.log_r)),
"r_max": float(max(self.log_r)),
"r_mean": float(sum(self.log_r) / len(self.log_r)),
"R": float(sum(self.log_reward)),
"log_r": [int(x) for x in self.log_r],
"log_fx": [int(x) for x in self.log_fx],
"log_reward": [float(x) for x in self.log_reward],
}
return self.get_state(), reward, truncated, terminated, returned_info
[docs]
def close(self) -> bool:
"""Close Env.
No additional cleanup necessary
Returns:
--------
bool: Closing confirmation
"""
return True
[docs]
class TheoryEnvDiscrete(TheoryEnv):
"""RLS environment where the choices of r is discretised."""
def __init__(self, config, test_env=False):
"""Init env."""
super().__init__(config, test_env)
assert (
"action_choices" in config
), "Error: action_choices must be specified in benchmark's config"
assert isinstance(
self.action_space, gym.spaces.Discrete
), "Error: action space must be discrete"
assert self.action_space.n == len(config["action_choices"]), (
"Error: action space's size (%d) must be equal "
"to the len(action_choices) (%d)"
) % (self.action_space.n, len(config["action_choices"]))
self.action_choices = config["action_choices"]
[docs]
def step(self, action):
"""Take step."""
if isinstance(action, list | np.ndarray):
assert len(action) == 1
action = action[0]
return super().step(self.action_choices[action])