Implementation:Isaac sim IsaacGymEnvs VecTask Subclass Pattern
| Knowledge Sources | |
|---|---|
| Type | Pattern Doc |
| Domains | Architecture, Simulation |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
Concrete code pattern for creating a new GPU-accelerated RL environment by subclassing VecTask and implementing all required abstract methods.
Description
This pattern shows the complete structure of a custom task class that inherits from VecTask. The pattern is derived from the Cartpole reference implementation (cartpole.py:L36-196) and the VecTask base class (vec_task.py:L207-840). It covers the class skeleton, constructor pattern, abstract method implementations, and interaction with inherited GPU tensor buffers.
Usage
Follow this pattern when creating any new IsaacGymEnvs task. Start with the skeleton, fill in each method according to your task design specification, and test incrementally.
Code Reference
Source Location
- Repository: NVIDIA-Omniverse/IsaacGymEnvs
- Base class: isaacgymenvs/tasks/base/vec_task.py (L207-840)
- Reference: isaacgymenvs/tasks/cartpole.py (L36-196)
Import
from isaacgymenvs.tasks.base.vec_task import VecTask
from isaacgym import gymtorch, gymapi, gymutil
import torch
Task Class Skeleton
class MyTask(VecTask):
def __init__(self, cfg, rl_device, sim_device, graphics_device_id,
headless, virtual_screen_capture, force_render):
# Step 1: Read task-specific config
self.max_episode_length = cfg["env"]["episodeLength"]
# ... read additional task-specific parameters from cfg ...
# Step 2: Set observation and action dimensions in config
self.cfg["env"]["numObservations"] = N # observation vector size
self.cfg["env"]["numActions"] = M # action vector size
# Step 3: Call super().__init__() — this triggers:
# - create_sim()
# - allocate_buffers() [creates obs_buf, rew_buf, reset_buf, progress_buf]
# - prepare_sim()
super().__init__(
config=self.cfg,
rl_device=rl_device,
sim_device=sim_device,
graphics_device_id=graphics_device_id,
headless=headless,
virtual_screen_capture=virtual_screen_capture,
force_render=force_render,
)
# Step 4: Acquire GPU tensor handles (after sim is created)
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_dof_state_tensor(self.sim)
self.root_states = gymtorch.wrap_tensor(actor_root_state)
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
self.dof_pos = self.dof_state.view(self.num_envs, -1, 2)[..., 0]
self.dof_vel = self.dof_state.view(self.num_envs, -1, 2)[..., 1]
def create_sim(self):
# Step 1: Create simulation
self.sim = super().create_sim(
self.device_id, self.graphics_device_id,
self.physics_engine, self.sim_params
)
# Step 2: Create ground plane
plane_params = gymapi.PlaneParams()
self.gym.add_ground(self.sim, plane_params)
# Step 3: Load URDF asset
asset_root = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../assets"
)
asset_file = "my_robot.urdf"
asset_options = gymapi.AssetOptions()
asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
# Step 4: Create environments and actors
spacing = 2.0
lower = gymapi.Vec3(-spacing, 0.0, -spacing)
upper = gymapi.Vec3(spacing, spacing, spacing)
self.envs = []
self.actor_handles = []
for i in range(self.num_envs):
env = self.gym.create_env(self.sim, lower, upper, int(self.num_envs ** 0.5))
pose = gymapi.Transform()
pose.p = gymapi.Vec3(0.0, 1.0, 0.0)
actor = self.gym.create_actor(env, asset, pose, "actor", i, 1)
self.envs.append(env)
self.actor_handles.append(actor)
def pre_physics_step(self, actions):
# Convert RL actions to physics commands
self.actions = actions.clone().to(self.device)
forces = self.actions * self.action_scale
force_tensor = gymtorch.unwrap_tensor(forces)
self.gym.set_dof_actuation_force_tensor(self.sim, force_tensor)
def post_physics_step(self):
# Increment progress counter
self.progress_buf += 1
# Refresh GPU tensors with latest physics state
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
# Compute observations, rewards, and resets
self.compute_observations()
self.compute_reward()
# Handle environment resets
env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(env_ids) > 0:
self.reset_idx(env_ids)
def compute_observations(self):
# Fill self.obs_buf with current state
self.obs_buf[:, 0:3] = self.root_states[:, 0:3] # position
self.obs_buf[:, 3:6] = self.root_states[:, 7:10] # linear velocity
# ... fill remaining observation components ...
return self.obs_buf
def compute_reward(self):
# Compute per-environment rewards and reset flags
self.rew_buf[:], self.reset_buf[:] = compute_my_task_reward(
self.obs_buf,
self.actions,
self.progress_buf,
self.max_episode_length,
)
def reset_idx(self, env_ids):
# Randomize DOF states for reset environments
positions = torch_rand_float(-0.2, 0.2,
(len(env_ids), self.num_dof), device=self.device)
velocities = torch_rand_float(-0.1, 0.1,
(len(env_ids), self.num_dof), device=self.device)
self.dof_pos[env_ids] = positions
self.dof_vel[env_ids] = velocities
env_ids_int32 = env_ids.to(dtype=torch.int32)
self.gym.set_dof_state_tensor_indexed(
self.sim,
gymtorch.unwrap_tensor(self.dof_state),
gymtorch.unwrap_tensor(env_ids_int32),
len(env_ids_int32),
)
# Reset buffers for these environments
self.reset_buf[env_ids] = 0
self.progress_buf[env_ids] = 0
Cartpole: Complete Reference Implementation
# From isaacgymenvs/tasks/cartpole.py (L36-196)
class Cartpole(VecTask):
def __init__(self, cfg, rl_device, sim_device, graphics_device_id,
headless, virtual_screen_capture, force_render):
self.cfg = cfg
self.reset_dist = self.cfg["env"]["resetDist"]
self.max_push_effort = self.cfg["env"]["maxEffort"]
self.max_episode_length = 500
self.cfg["env"]["numObservations"] = 4
self.cfg["env"]["numActions"] = 1
super().__init__(config=self.cfg, rl_device=rl_device,
sim_device=sim_device,
graphics_device_id=graphics_device_id,
headless=headless,
virtual_screen_capture=virtual_screen_capture,
force_render=force_render)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0]
self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]
def create_sim(self):
self.sim_params.up_axis = gymapi.UP_AXIS_Z
self.sim_params.gravity.x = 0
self.sim_params.gravity.y = 0
self.sim_params.gravity.z = -9.81
self.sim = super().create_sim(
self.device_id, self.graphics_device_id,
self.physics_engine, self.sim_params)
self._create_ground_plane()
self._create_envs(self.num_envs, self.cfg["env"]["envSpacing"], int(np.sqrt(self.num_envs)))
def pre_physics_step(self, actions):
self.actions = actions.clone().to(self.device)
forces = torch.zeros(self.num_envs, self.num_dof,
dtype=torch.float, device=self.device)
forces[:, 0] = self.actions[:, 0] * self.max_push_effort
force_tensor = gymtorch.unwrap_tensor(forces)
self.gym.set_dof_actuation_force_tensor(self.sim, force_tensor)
def post_physics_step(self):
self.progress_buf += 1
self.gym.refresh_dof_state_tensor(self.sim)
self.compute_observations()
self.compute_reward()
env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(env_ids) > 0:
self.reset_idx(env_ids)
def compute_observations(self):
self.gym.refresh_dof_state_tensor(self.sim)
self.obs_buf[:, 0] = self.dof_pos[:, 0] # cart position
self.obs_buf[:, 1] = self.dof_vel[:, 0] # cart velocity
self.obs_buf[:, 2] = self.dof_pos[:, 1] # pole angle
self.obs_buf[:, 3] = self.dof_vel[:, 1] # pole angular velocity
return self.obs_buf
def compute_reward(self):
self.rew_buf[:], self.reset_buf[:] = compute_cartpole_reward(
self.obs_buf, self.reset_dist, self.reset_buf,
self.progress_buf, self.max_episode_length)
def reset_idx(self, env_ids):
positions = 0.2 * (torch.rand((len(env_ids), self.num_dof),
device=self.device) - 0.5)
velocities = 0.5 * (torch.rand((len(env_ids), self.num_dof),
device=self.device) - 0.5)
self.dof_pos[env_ids] = positions[:]
self.dof_vel[env_ids] = velocities[:]
env_ids_int32 = env_ids.to(dtype=torch.int32)
self.gym.set_dof_state_tensor_indexed(
self.sim, gymtorch.unwrap_tensor(self.dof_state),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
self.reset_buf[env_ids] = 0
self.progress_buf[env_ids] = 0
Key Inherited Buffers
| Buffer | Shape | Type | Description |
|---|---|---|---|
self.obs_buf |
[num_envs, num_obs] | torch.float32 | Observation buffer filled by compute_observations()
|
self.rew_buf |
[num_envs] | torch.float32 | Reward buffer filled by compute_reward()
|
self.reset_buf |
[num_envs] | torch.int32 | Reset flag buffer (1 = reset needed, 0 = continue) |
self.progress_buf |
[num_envs] | torch.int32 | Episode step counter, incremented each step |
self.actions |
[num_envs, num_actions] | torch.float32 | Latest actions from the RL policy |
self.extras |
dict | dict | Extra info returned to the training algorithm (e.g., episode statistics) |
Related Pages
- Isaac_sim_IsaacGymEnvs_VecTask_Subclass_Creation - implements - Principle defining the OOP pattern for task creation.
- Isaac_sim_IsaacGymEnvs_VecTask_Simulation_Loop - details - Detailed API reference for each simulation loop method.
- Isaac_sim_IsaacGymEnvs_Task_Design_Specification - prerequisite - Design specification that this implementation realizes.
- Isaac_sim_IsaacGymEnvs_Isaacgym_Task_Map_Registration - next step - Register the new task class for CLI discovery.