Skip to content

Plr

mighty.mighty_meta.plr #

Curriculum Learning via Prioritized Level Replay. This is adapted from: github.com/facebookresearch/level-replay/blob/main/level_replay/level_sampler.py.

PrioritizedLevelReplay #

PrioritizedLevelReplay(
    alpha=1.0,
    rho=0.2,
    staleness_coeff=0,
    sample_strategy="value_l1",
    score_transform="power",
    temperature=1.0,
    staleness_transform="power",
    staleness_temperature=1.0,
    eps=0.001,
)

Bases: MightyMetaComponent

Curriculum Learning via Prioritized Level Replay.

:param alpha: Decay factor for scores :param rho: Minimum proportion of instances that has to be seen before re-sampling seen ones :param staleness_coeff: Staleness coefficient :param sample_strategy: Strategy for level sampling. One of: random, sequential, policy_entropy, least_confidence, min_margin, gae, value_l1, one_step_td_error :param score_transform: Transformation for the score. One of: max, constant, eps_greedy, rank, power softmax :param termperature: Temperature for score transformation :param staleness_transform: Transformation for staleness. One of: max, constant, eps_greedy, rank, power softmax :param staleness_temperature: Temperature for staleness transformation :return:

Source code in mighty/mighty_meta/plr.py
def __init__(
    self,
    alpha=1.0,
    rho=0.2,
    staleness_coeff=0,
    sample_strategy="value_l1",
    score_transform="power",
    temperature=1.0,
    staleness_transform="power",
    staleness_temperature=1.0,
    eps=1e-3,
) -> None:
    """PLR initialization.

    :param alpha: Decay factor for scores
    :param rho: Minimum proportion of instances that has to be
        seen before re-sampling seen ones
    :param staleness_coeff: Staleness coefficient
    :param sample_strategy: Strategy for level sampling.
        One of: random, sequential, policy_entropy, least_confidence,
        min_margin, gae, value_l1, one_step_td_error
    :param score_transform: Transformation for the score.
        One of: max, constant, eps_greedy, rank, power softmax
    :param termperature: Temperature for score transformation
    :param staleness_transform: Transformation for staleness.
        One of: max, constant, eps_greedy, rank, power softmax
    :param staleness_temperature: Temperature for staleness transformation
    :return:
    """
    super().__init__()
    self.rng = np.random.default_rng()
    self.alpha = alpha
    self.rho = rho
    self.staleness_coef = staleness_coeff
    self.sample_strategy = sample_strategy
    self.eps = eps
    self.instance_scores = {}
    self.staleness = {}
    self.all_instances = None
    self.index = 0
    self.num_actions = None
    self.score_transform = score_transform
    self.temperature = temperature
    self.staleness_transform = staleness_transform
    self.staleness_temperature = staleness_temperature

    self.pre_episode_methods = [self.get_instance]
    self.post_episode_methods = [self.add_rollout]

add_rollout #

add_rollout(metrics)

Save rollout stats.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/plr.py
def add_rollout(self, metrics):
    """Save rollout stats.

    :param metrics: Current metrics dict
    :return:
    """
    instance_ids = metrics["env"].inst_ids
    episode_reward = metrics["episode_reward"]
    rollout_values = metrics["rollout_values"]
    rollout_logits = [None] * len(instance_ids)
    if "rollout_logits" in metrics:
        rollout_logits = metrics["rollout_logits"]

    if self.all_instances is None:
        self.all_instances = metrics["env"].instance_id_list
        self.num_instances = len(metrics["env"].inst_ids)
        for i in self.all_instances:
            if i not in self.instance_scores:
                self.instance_scores[i] = 0
            if i not in self.staleness:
                self.staleness[i] = 0
        if isinstance(metrics["env"].action_space, gym.spaces.Discrete):
            self.num_actions = metrics["env"].action_space.n

    for instance_id, ep_rew, rollouts, logits in zip(
        instance_ids, episode_reward, rollout_values, rollout_logits
    ):
        score = self.score_function(ep_rew, rollouts, logits)
        if instance_id not in self.instance_scores:
            self.instance_scores[instance_id] = 0
        old_score = self.instance_scores[instance_id]
        self.instance_scores[instance_id] = (
            1 - self.alpha
        ) * old_score + self.alpha * score

get_instance #

get_instance(metrics=None)

Get Training instance on episode start.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/plr.py
def get_instance(self, metrics=None):
    """Get Training instance on episode start.

    :param metrics: Current metrics dict
    :return:
    """
    if self.sample_strategy == "random":
        instances = self.rng.choice(self.all_instances, size=self.num_instances)

    if self.sample_strategy == "sequential":
        instances = []
        for _ in range(self.num_instances):
            instances.append(self.all_instances[self.index])
            self.index = (self.index + 1) % len(self.all_instances)

    num_unseen = len(self.all_instances) - len(list(self.instance_scores.keys()))
    proportion_seen = (len(self.all_instances) - num_unseen) / len(
        self.all_instances
    )
    instances = []
    for _ in range(self.num_instances):
        if proportion_seen >= self.rho and self.rng.random() < proportion_seen:
            instances.append(self._sample_replay_level())
        else:
            instances.append(self._sample_unseen_level())
    metrics["env"].inst_ids = instances

post_episode #

post_episode(metrics)

Execute methods at the end of an episode.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/mighty_component.py
def post_episode(self, metrics):
    """Execute methods at the end of an episode.

    :param metrics: Current metrics dict
    :return:
    """
    for m in self.post_episode_methods:
        m(metrics)

post_step #

post_step(metrics)

Execute methods after a step.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/mighty_component.py
def post_step(self, metrics):
    """Execute methods after a step.

    :param metrics: Current metrics dict
    :return:
    """
    for m in self.post_step_methods:
        m(metrics)

post_update #

post_update(metrics)

Execute methods after the update.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/mighty_component.py
def post_update(self, metrics):
    """Execute methods after the update.

    :param metrics: Current metrics dict
    :return:
    """
    for m in self.post_update_methods:
        m(metrics)

pre_episode #

pre_episode(metrics)

Execute methods before an episode.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/mighty_component.py
def pre_episode(self, metrics):
    """Execute methods before an episode.

    :param metrics: Current metrics dict
    :return:
    """
    for m in self.pre_episode_methods:
        m(metrics)

pre_step #

pre_step(metrics)

Execute methods before a step.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/mighty_component.py
def pre_step(self, metrics):
    """Execute methods before a step.

    :param metrics: Current metrics dict
    :return:
    """
    for m in self.pre_step_methods:
        m(metrics)

pre_update #

pre_update(metrics)

Execute methods before the update.

:param metrics: Current metrics dict :return:

Source code in mighty/mighty_meta/mighty_component.py
def pre_update(self, metrics):
    """Execute methods before the update.

    :param metrics: Current metrics dict
    :return:
    """
    for m in self.pre_update_methods:
        m(metrics)

sample_weights #

sample_weights()

Get weights for sampling.

:return:

Source code in mighty/mighty_meta/plr.py
def sample_weights(self):
    """Get weights for sampling.

    :return:
    """
    weights = self._score_transform(
        self.score_transform, self.temperature, self.instance_scores
    )
    ww = []
    for i, w in zip(self.all_instances, weights, strict=False):
        if i not in self.instance_scores:
            ww.append(0)
        else:
            ww.append(w)
    weights = np.array(ww)

    z = np.sum(weights)
    if z > 0:
        weights /= z

    staleness_weights = 0
    if self.staleness_coef > 0:
        staleness_weights = self._score_transform(
            self.staleness_transform, self.staleness_temperature, self.staleness
        )
        ws = []
        for i, w in zip(self.all_instances, staleness_weights, strict=False):
            if i not in self.instance_scores:
                ws.append(0)
            else:
                ws.append(w)
        staleness_weights = np.array(ws)
        z = np.sum(staleness_weights)
        if z > 0:
            staleness_weights /= z

        weights = (
            1 - self.staleness_coef
        ) * weights + self.staleness_coef * staleness_weights

    return weights

score_function #

score_function(reward, values, logits)

Get score.

:param reward: Rollout rewards :param values: Rollout values :param logits: Rollout logits :return: score

Source code in mighty/mighty_meta/plr.py
def score_function(self, reward, values, logits):
    """Get score.

    :param reward: Rollout rewards
    :param values: Rollout values
    :param logits: Rollout logits
    :return: score
    """
    if self.sample_strategy == "random":
        score = 1
    elif self.sample_strategy == "policy_entropy":
        if logits is None:
            raise ValueError("Logits are required for policy entropy.")
        score = self._average_entropy(logits)
    elif self.sample_strategy == "least_confidence":
        if logits is None:
            raise ValueError("Logits are required for least confidence.")
        score = self._average_least_confidence(logits)
    elif self.sample_strategy == "min_margin":
        if logits is None:
            raise ValueError("Logits are required for min margin.")
        score = self._average_min_margin(logits)
    elif self.sample_strategy == "gae":
        score = self._average_gae(reward, values)
    elif self.sample_strategy == "value_l1":
        score = self._average_value_l1(reward, values)
    elif self.sample_strategy == "one_step_td_error":
        score = self._one_step_td_error(reward, values)
    else:
        raise NotImplementedError
    return score