Implementation:Isaac sim IsaacGymEnvs ADRVecTask Adr Update
Appearance
| Knowledge Sources | |
|---|---|
| Domains | |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
The ADRVecTask class implements Automatic Domain Randomization by extending VecTaskDextreme with boundary worker management, ADR range updates, per-environment tensor sampling, and environment recycling. The adr_update() method is the core algorithm that adjusts randomization ranges based on boundary worker performance.
Description
ADRVecTask manages the full ADR lifecycle:
- Initialization: Sets up worker type tensors, ADR parameter dictionaries with initial ranges, boundary evaluation queues (deques with max length), and ADR mode assignments.
- Range updates (
adr_update()): Implements Algorithm 1 from the OpenAI ADR paper -- evaluates boundary worker performance and expands/contracts ranges. - Parameter modification (
modify_adr_param()): Applies delta adjustments (additive or multiplicative) to range endpoints with limit clamping. - Tensor sampling (
sample_adr_tensor()): Generates per-environment ADR parameter values, assigning boundary values to boundary workers and sampling from the current range for rollout workers. - Environment recycling (
recycle_envs()): Reassigns completed environments to new worker types and ADR mode assignments.
Usage
ADRVecTask is used as a base class for ADR-enabled task implementations:
from isaacgymenvs.tasks.dextreme.adr_vec_task import ADRVecTask
class AllegroHandDextreme(ADRVecTask):
def __init__(self, cfg, rl_device, sim_device, graphics_device_id,
headless, virtual_screen_capture, force_render):
# ... setup ...
super().__init__(config=self.cfg, rl_device=rl_device,
sim_device=sim_device, ...)
Code Reference
Source Location
- File:
isaacgymenvs/tasks/dextreme/adr_vec_task.py(lines 489--918)
Signatures
class RolloutWorkerModes:
ADR_ROLLOUT = 0 # Standard rollout with current ADR params
ADR_BOUNDARY = 1 # Evaluate performance at ADR range boundaries
TEST_ENV = 2 # Evaluate with default DR params (unused currently)
class ADRVecTask(VecTaskDextreme):
def __init__(self, config, rl_device, sim_device,
graphics_device_id, headless, use_dict_obs=False):
"""Initialize ADR state: worker types, parameter dicts, queues."""
def adr_update(self, rand_envs, adr_objective):
"""Perform ADR update step (Algorithm 1 from OpenAI ADR paper).
Args:
rand_envs: list of env IDs being reset/randomized.
adr_objective: per-env performance metric (consecutive successes).
"""
def modify_adr_param(self, param, direction, adr_param_dict,
param_limit=None):
"""Modify an ADR parameter endpoint.
Args:
param: current value of the endpoint.
direction: 'up' to increase, 'down' to decrease.
adr_param_dict: ADR param config with 'delta' and 'delta_style'.
param_limit: hard limit for the parameter.
Returns:
(new_value, changed): tuple of new value and whether it changed.
"""
def sample_adr_tensor(self, param_name, env_ids=None):
"""Sample values for an ADR tensor parameter.
Boundary workers get boundary values; rollout workers sample
from current range. Results stored in self.adr_tensor_values.
Args:
param_name: name of the ADR parameter.
env_ids: specific env IDs to sample (default: all).
Returns:
Tensor of sampled values for the given env_ids.
"""
def get_adr_tensor(self, param_name, env_ids=None):
"""Retrieve current ADR tensor values for a parameter."""
def recycle_envs(self, recycle_envs):
"""Reassign worker types and ADR modes for recycled environments.
Args:
recycle_envs: env IDs to recycle.
"""
def get_dr_params_by_env_id(self, env_id, default_dr_params,
current_adr_params):
"""Get per-environment DR dictionary.
Returns patched DR params: boundary workers get collapsed ranges,
rollout workers get current ADR ranges, test workers get defaults.
"""
def get_current_adr_params(self, dr_params):
"""Splice current ADR ranges into the DR params dictionary."""
Import
from isaacgymenvs.tasks.dextreme.adr_vec_task import ADRVecTask
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
rand_envs |
list[int] | Environment IDs being reset/randomized this step. |
adr_objective |
torch.Tensor | Per-environment performance metric (consecutive successes). Shape: (num_envs,).
|
| Name | Type | Description |
|---|---|---|
param_name |
str | Name of the ADR parameter to sample. |
env_ids |
torch.Tensor or None | Specific environment IDs to sample. Defaults to all environments. |
Outputs
| Name | Type | Description |
|---|---|---|
self.adr_params[name]["range"] |
[float, float] | Updated ADR range endpoints after boundary evaluation. |
self.adr_params[name]["next_limits"] |
[float, float] | Pre-computed next expansion values for extended boundary sampling. |
self.adr_tensor_values[name] |
torch.Tensor | Per-environment sampled parameter values. Shape: (num_envs,).
|
self.worker_types |
torch.Tensor (long) | Per-environment worker type assignments (0=rollout, 1=boundary, 2=test). |
self.adr_modes |
torch.Tensor (long) | Per-environment ADR mode (which parameter and direction to evaluate). Mode 2*n = lower bound of param n, 2*n+1 = upper bound.
|
self.extras['adr/npd'] |
float | Nats per dimension metric measuring total randomization volume. |
self.extras['adr/params/*/lower'] |
float | Current lower bound for each ADR parameter (logged to TensorBoard). |
self.extras['adr/params/*/upper'] |
float | Current upper bound for each ADR parameter (logged to TensorBoard). |
Key Behavior
adr_update Algorithm
def adr_update(self, rand_envs, adr_objective):
total_nats = 0.0
for n, adr_param_name in shuffled(enumerate(self.adr_params)):
low_idx = 2 * n # queue index for lower bound
high_idx = 2 * n + 1 # queue index for upper bound
# Find boundary workers for this parameter that just finished
adr_done_low = resetting & (worker_type == BOUNDARY) & (mode == low_idx)
adr_done_high = resetting & (worker_type == BOUNDARY) & (mode == high_idx)
# Record performance in queues
self.adr_objective_queues[low_idx].extend(objective[adr_done_low])
self.adr_objective_queues[high_idx].extend(objective[adr_done_high])
# Evaluate lower bound queue
if len(low_queue) >= threshold_length:
if mean(low_queue) < threshold_low:
range_lower = modify_param(range_lower, 'up') # contract
elif mean(low_queue) > threshold_high:
range_lower = modify_param(range_lower, 'down') # expand
# Evaluate upper bound queue (symmetric logic)
if len(high_queue) >= threshold_length:
if mean(high_queue) < threshold_low:
range_upper = modify_param(range_upper, 'down') # contract
elif mean(high_queue) > threshold_high:
range_upper = modify_param(range_upper, 'up') # expand
# Update range and compute NPD
self.adr_params[name]["range"] = [range_lower, range_upper]
total_nats += log(range_upper - range_lower)
self.extras['adr/npd'] = total_nats / len(self.adr_params)
self.recycle_envs(rand_envs)
Worker Recycling
def recycle_envs(self, recycle_envs):
worker_types_rand = torch.rand(len(recycle_envs), device=self.device)
# Fraction below threshold -> rollout workers, above -> boundary workers
new_worker_types = torch.zeros(len(recycle_envs), dtype=torch.long, ...)
new_worker_types[worker_types_rand < self.worker_adr_boundary_fraction] = ADR_ROLLOUT
new_worker_types[worker_types_rand >= self.worker_adr_boundary_fraction] = ADR_BOUNDARY
self.worker_types[recycle_envs] = new_worker_types
# Randomly assign boundary workers to evaluate specific parameter boundaries
self.adr_modes[recycle_envs] = torch.randint(0, self.num_adr_params * 2, ...)
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment