Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Isaac sim IsaacGymEnvs ADRVecTask Adr Update

From Leeroopedia
Metadata
Knowledge Sources
Domains
Last Updated 2026-02-15 00:00 GMT

Overview

The ADRVecTask class implements Automatic Domain Randomization by extending VecTaskDextreme with boundary worker management, ADR range updates, per-environment tensor sampling, and environment recycling. The adr_update() method is the core algorithm that adjusts randomization ranges based on boundary worker performance.

Description

ADRVecTask manages the full ADR lifecycle:

  • Initialization: Sets up worker type tensors, ADR parameter dictionaries with initial ranges, boundary evaluation queues (deques with max length), and ADR mode assignments.
  • Range updates (adr_update()): Implements Algorithm 1 from the OpenAI ADR paper -- evaluates boundary worker performance and expands/contracts ranges.
  • Parameter modification (modify_adr_param()): Applies delta adjustments (additive or multiplicative) to range endpoints with limit clamping.
  • Tensor sampling (sample_adr_tensor()): Generates per-environment ADR parameter values, assigning boundary values to boundary workers and sampling from the current range for rollout workers.
  • Environment recycling (recycle_envs()): Reassigns completed environments to new worker types and ADR mode assignments.

Usage

ADRVecTask is used as a base class for ADR-enabled task implementations:

from isaacgymenvs.tasks.dextreme.adr_vec_task import ADRVecTask

class AllegroHandDextreme(ADRVecTask):
    def __init__(self, cfg, rl_device, sim_device, graphics_device_id,
                 headless, virtual_screen_capture, force_render):
        # ... setup ...
        super().__init__(config=self.cfg, rl_device=rl_device,
                        sim_device=sim_device, ...)

Code Reference

Source Location

  • File: isaacgymenvs/tasks/dextreme/adr_vec_task.py (lines 489--918)

Signatures

class RolloutWorkerModes:
    ADR_ROLLOUT = 0   # Standard rollout with current ADR params
    ADR_BOUNDARY = 1  # Evaluate performance at ADR range boundaries
    TEST_ENV = 2      # Evaluate with default DR params (unused currently)


class ADRVecTask(VecTaskDextreme):

    def __init__(self, config, rl_device, sim_device,
                 graphics_device_id, headless, use_dict_obs=False):
        """Initialize ADR state: worker types, parameter dicts, queues."""

    def adr_update(self, rand_envs, adr_objective):
        """Perform ADR update step (Algorithm 1 from OpenAI ADR paper).

        Args:
            rand_envs: list of env IDs being reset/randomized.
            adr_objective: per-env performance metric (consecutive successes).
        """

    def modify_adr_param(self, param, direction, adr_param_dict,
                          param_limit=None):
        """Modify an ADR parameter endpoint.

        Args:
            param: current value of the endpoint.
            direction: 'up' to increase, 'down' to decrease.
            adr_param_dict: ADR param config with 'delta' and 'delta_style'.
            param_limit: hard limit for the parameter.
        Returns:
            (new_value, changed): tuple of new value and whether it changed.
        """

    def sample_adr_tensor(self, param_name, env_ids=None):
        """Sample values for an ADR tensor parameter.

        Boundary workers get boundary values; rollout workers sample
        from current range. Results stored in self.adr_tensor_values.

        Args:
            param_name: name of the ADR parameter.
            env_ids: specific env IDs to sample (default: all).
        Returns:
            Tensor of sampled values for the given env_ids.
        """

    def get_adr_tensor(self, param_name, env_ids=None):
        """Retrieve current ADR tensor values for a parameter."""

    def recycle_envs(self, recycle_envs):
        """Reassign worker types and ADR modes for recycled environments.

        Args:
            recycle_envs: env IDs to recycle.
        """

    def get_dr_params_by_env_id(self, env_id, default_dr_params,
                                 current_adr_params):
        """Get per-environment DR dictionary.

        Returns patched DR params: boundary workers get collapsed ranges,
        rollout workers get current ADR ranges, test workers get defaults.
        """

    def get_current_adr_params(self, dr_params):
        """Splice current ADR ranges into the DR params dictionary."""

Import

from isaacgymenvs.tasks.dextreme.adr_vec_task import ADRVecTask

I/O Contract

Inputs

Input Contract -- adr_update
Name Type Description
rand_envs list[int] Environment IDs being reset/randomized this step.
adr_objective torch.Tensor Per-environment performance metric (consecutive successes). Shape: (num_envs,).
Input Contract -- sample_adr_tensor
Name Type Description
param_name str Name of the ADR parameter to sample.
env_ids torch.Tensor or None Specific environment IDs to sample. Defaults to all environments.

Outputs

Output Contract
Name Type Description
self.adr_params[name]["range"] [float, float] Updated ADR range endpoints after boundary evaluation.
self.adr_params[name]["next_limits"] [float, float] Pre-computed next expansion values for extended boundary sampling.
self.adr_tensor_values[name] torch.Tensor Per-environment sampled parameter values. Shape: (num_envs,).
self.worker_types torch.Tensor (long) Per-environment worker type assignments (0=rollout, 1=boundary, 2=test).
self.adr_modes torch.Tensor (long) Per-environment ADR mode (which parameter and direction to evaluate). Mode 2*n = lower bound of param n, 2*n+1 = upper bound.
self.extras['adr/npd'] float Nats per dimension metric measuring total randomization volume.
self.extras['adr/params/*/lower'] float Current lower bound for each ADR parameter (logged to TensorBoard).
self.extras['adr/params/*/upper'] float Current upper bound for each ADR parameter (logged to TensorBoard).

Key Behavior

adr_update Algorithm

def adr_update(self, rand_envs, adr_objective):
    total_nats = 0.0

    for n, adr_param_name in shuffled(enumerate(self.adr_params)):
        low_idx = 2 * n       # queue index for lower bound
        high_idx = 2 * n + 1  # queue index for upper bound

        # Find boundary workers for this parameter that just finished
        adr_done_low = resetting & (worker_type == BOUNDARY) & (mode == low_idx)
        adr_done_high = resetting & (worker_type == BOUNDARY) & (mode == high_idx)

        # Record performance in queues
        self.adr_objective_queues[low_idx].extend(objective[adr_done_low])
        self.adr_objective_queues[high_idx].extend(objective[adr_done_high])

        # Evaluate lower bound queue
        if len(low_queue) >= threshold_length:
            if mean(low_queue) < threshold_low:
                range_lower = modify_param(range_lower, 'up')     # contract
            elif mean(low_queue) > threshold_high:
                range_lower = modify_param(range_lower, 'down')   # expand

        # Evaluate upper bound queue (symmetric logic)
        if len(high_queue) >= threshold_length:
            if mean(high_queue) < threshold_low:
                range_upper = modify_param(range_upper, 'down')   # contract
            elif mean(high_queue) > threshold_high:
                range_upper = modify_param(range_upper, 'up')     # expand

        # Update range and compute NPD
        self.adr_params[name]["range"] = [range_lower, range_upper]
        total_nats += log(range_upper - range_lower)

    self.extras['adr/npd'] = total_nats / len(self.adr_params)
    self.recycle_envs(rand_envs)

Worker Recycling

def recycle_envs(self, recycle_envs):
    worker_types_rand = torch.rand(len(recycle_envs), device=self.device)

    # Fraction below threshold -> rollout workers, above -> boundary workers
    new_worker_types = torch.zeros(len(recycle_envs), dtype=torch.long, ...)
    new_worker_types[worker_types_rand < self.worker_adr_boundary_fraction] = ADR_ROLLOUT
    new_worker_types[worker_types_rand >= self.worker_adr_boundary_fraction] = ADR_BOUNDARY

    self.worker_types[recycle_envs] = new_worker_types
    # Randomly assign boundary workers to evaluate specific parameter boundaries
    self.adr_modes[recycle_envs] = torch.randint(0, self.num_adr_params * 2, ...)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment