Implementation:Isaac sim IsaacGymEnvs FactoryTask Training Loop
| Knowledge Sources | |
|---|---|
| Domains | Manipulation, Reinforcement_Learning |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
Task classes and reward utility functions that implement the Factory/IndustReal assembly training loop with keypoint, SDF, SAPU, and curriculum-based reward shaping.
Description
The training loop is implemented across task classes (e.g., FactoryTaskNutBoltPick) and shared utility functions in industreal_algo_utils. Task classes implement the standard IsaacGymEnvs interface: pre_physics_step() processes actions through the controller, compute_observations() builds the observation buffer, compute_reward() calculates shaped rewards, and reset_idx() handles episode resets with curriculum sampling. The IndustReal utility functions provide SAPU reward scaling, SDF reward computation, and curriculum reward scaling as reusable components.
Usage
Use these classes and functions when training assembly sub-policies. The task class is selected via the task= argument to train.py, and reward parameters are configured through the task-tier YAML config.
Code Reference
Source Location
- Repository: IsaacGymEnvs
- File: isaacgymenvs/tasks/factory/factory_task_nut_bolt_pick.py, Lines 49-451
- File: isaacgymenvs/tasks/industreal/industreal_algo_utils.py, Lines 158-331
Signature
Factory Task Class
class FactoryTaskNutBoltPick(FactoryEnvNutBolt, FactoryABCTask):
def __init__(self, cfg, rl_device, sim_device, graphics_device_id,
headless, virtual_screen_capture, force_render):
"""Initialize nut-bolt pick task with reward configuration."""
def pre_physics_step(self, actions): # L125
"""Process RL actions: scale, compute controller targets, apply to sim.
Args:
actions: Raw RL policy output [num_envs, 6]
(3D pos delta + 3D rot delta)
"""
def compute_observations(self): # L158
"""Build observation buffer from fingertip pose, object poses, goals.
Returns:
Observation tensor [num_envs, obs_dim]
"""
def compute_reward(self): # L173
"""Compute shaped reward from keypoint distances and bonus terms.
Updates self.rew_buf and self.reset_buf.
"""
def reset_idx(self, env_ids):
"""Reset environments: randomize object poses within curriculum bounds.
Args:
env_ids: Tensor of environment indices to reset.
"""
IndustReal Reward Utilities
def get_sapu_reward_scale(
asset_indices, plug_pos, plug_quat, socket_pos, socket_quat,
wp_plug_meshes_sampled_points, wp_socket_meshes,
interpen_thresh, wp_device, device
):
"""Compute SAPU reward scale based on interpenetration check.
Returns 1.0 if physically plausible, 0.0 if interpenetration
exceeds threshold.
Returns:
reward_scale: [num_envs] tensor of 0.0 or 1.0
"""
def get_sdf_reward(
wp_plug_meshes_sampled_points, asset_indices,
plug_pos, plug_quat, plug_goal_sdfs, wp_device, device
):
"""Compute SDF-based reward for insertion progress.
Queries SDF of socket mesh at transformed plug surface points.
Returns:
sdf_reward: [num_envs] tensor of SDF reward values
"""
def get_curriculum_reward_scale(cfg_task, curr_max_disp):
"""Compute curriculum-based reward scale from current max displacement.
Returns:
curriculum_scale: scalar reward multiplier
"""
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| actions | torch.Tensor | Yes | Raw RL policy output [num_envs, 6] (3D pos + 3D rot deltas)
|
| pos_action_scale | float | Yes | Scale factor for position deltas (from cfg_task) |
| rot_action_scale | float | Yes | Scale factor for rotation deltas (from cfg_task) |
| keypoint_reward_scale | float | Yes | Weight for keypoint-based reward |
| sdf_reward_scale | float | Yes (IndustReal) | Weight for SDF-based reward |
| interpen_thresh | float | Yes (IndustReal) | Interpenetration threshold for SAPU |
| engagement_bonus | float | Yes | Bonus reward on successful engagement detection |
| wp_plug_meshes_sampled_points | list[torch.Tensor] | Yes (IndustReal) | Sampled surface points for SDF queries |
| wp_socket_meshes | list[wp.Mesh] | Yes (IndustReal) | Warp meshes for SDF queries |
Outputs
| Name | Type | Description |
|---|---|---|
| self.rew_buf | torch.Tensor | Per-environment reward signal [num_envs]
|
| self.reset_buf | torch.Tensor | Per-environment reset flag [num_envs] (1 = reset needed)
|
| self.obs_buf | torch.Tensor | Observation buffer [num_envs, obs_dim]
|
| reward_scale (SAPU) | torch.Tensor | Binary scale factor [num_envs] (0.0 or 1.0)
|
| sdf_reward | torch.Tensor | SDF insertion reward [num_envs]
|
| curriculum_scale | float | Curriculum difficulty multiplier |
Usage Examples
Training a Nut-Bolt Pick Policy
# Train the pick sub-policy for nut-bolt assembly
python train.py task=FactoryTaskNutBoltPick \
num_envs=4096 \
headless=True \
max_iterations=500
Reward Composition in IndustReal
from isaacgymenvs.tasks.industreal.industreal_algo_utils import (
get_sapu_reward_scale,
get_sdf_reward,
get_curriculum_reward_scale,
)
# In compute_reward():
# 1. Compute SAPU scale (physical plausibility check)
sapu_scale = get_sapu_reward_scale(
asset_indices=self.asset_indices,
plug_pos=self.plug_pos, plug_quat=self.plug_quat,
socket_pos=self.socket_pos, socket_quat=self.socket_quat,
wp_plug_meshes_sampled_points=self.wp_plug_meshes_sampled_points,
wp_socket_meshes=self.wp_socket_meshes,
interpen_thresh=self.cfg_task.rl.interpen_thresh,
wp_device=self.wp_device, device=self.device,
)
# 2. Compute SDF reward (insertion progress)
sdf_reward = get_sdf_reward(
wp_plug_meshes_sampled_points=self.wp_plug_meshes_sampled_points,
asset_indices=self.asset_indices,
plug_pos=self.plug_pos, plug_quat=self.plug_quat,
plug_goal_sdfs=self.plug_goal_sdfs,
wp_device=self.wp_device, device=self.device,
)
# 3. Compute curriculum scale
curriculum_scale = get_curriculum_reward_scale(
cfg_task=self.cfg_task, curr_max_disp=self.curr_max_disp,
)
# 4. Compose total reward
self.rew_buf = sapu_scale * (
self.cfg_task.rl.keypoint_reward_scale * keypoint_reward
+ self.cfg_task.rl.sdf_reward_scale * sdf_reward
+ engagement_bonus
) * curriculum_scale