Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Isaac sim IsaacGymEnvs VecTask Subclass Pattern

From Leeroopedia
Knowledge Sources
Type Pattern Doc
Domains Architecture, Simulation
Last Updated 2026-02-15 00:00 GMT

Overview

Concrete code pattern for creating a new GPU-accelerated RL environment by subclassing VecTask and implementing all required abstract methods.

Description

This pattern shows the complete structure of a custom task class that inherits from VecTask. The pattern is derived from the Cartpole reference implementation (cartpole.py:L36-196) and the VecTask base class (vec_task.py:L207-840). It covers the class skeleton, constructor pattern, abstract method implementations, and interaction with inherited GPU tensor buffers.

Usage

Follow this pattern when creating any new IsaacGymEnvs task. Start with the skeleton, fill in each method according to your task design specification, and test incrementally.

Code Reference

Source Location

  • Repository: NVIDIA-Omniverse/IsaacGymEnvs
  • Base class: isaacgymenvs/tasks/base/vec_task.py (L207-840)
  • Reference: isaacgymenvs/tasks/cartpole.py (L36-196)

Import

from isaacgymenvs.tasks.base.vec_task import VecTask
from isaacgym import gymtorch, gymapi, gymutil
import torch

Task Class Skeleton

class MyTask(VecTask):
    def __init__(self, cfg, rl_device, sim_device, graphics_device_id,
                 headless, virtual_screen_capture, force_render):
        # Step 1: Read task-specific config
        self.max_episode_length = cfg["env"]["episodeLength"]
        # ... read additional task-specific parameters from cfg ...

        # Step 2: Set observation and action dimensions in config
        self.cfg["env"]["numObservations"] = N   # observation vector size
        self.cfg["env"]["numActions"] = M         # action vector size

        # Step 3: Call super().__init__() — this triggers:
        #   - create_sim()
        #   - allocate_buffers() [creates obs_buf, rew_buf, reset_buf, progress_buf]
        #   - prepare_sim()
        super().__init__(
            config=self.cfg,
            rl_device=rl_device,
            sim_device=sim_device,
            graphics_device_id=graphics_device_id,
            headless=headless,
            virtual_screen_capture=virtual_screen_capture,
            force_render=force_render,
        )

        # Step 4: Acquire GPU tensor handles (after sim is created)
        actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)
        dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)

        self.gym.refresh_actor_root_state_tensor(self.sim)
        self.gym.refresh_dof_state_tensor(self.sim)

        self.root_states = gymtorch.wrap_tensor(actor_root_state)
        self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
        self.dof_pos = self.dof_state.view(self.num_envs, -1, 2)[..., 0]
        self.dof_vel = self.dof_state.view(self.num_envs, -1, 2)[..., 1]

    def create_sim(self):
        # Step 1: Create simulation
        self.sim = super().create_sim(
            self.device_id, self.graphics_device_id,
            self.physics_engine, self.sim_params
        )

        # Step 2: Create ground plane
        plane_params = gymapi.PlaneParams()
        self.gym.add_ground(self.sim, plane_params)

        # Step 3: Load URDF asset
        asset_root = os.path.join(
            os.path.dirname(os.path.abspath(__file__)), "../assets"
        )
        asset_file = "my_robot.urdf"
        asset_options = gymapi.AssetOptions()
        asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)

        # Step 4: Create environments and actors
        spacing = 2.0
        lower = gymapi.Vec3(-spacing, 0.0, -spacing)
        upper = gymapi.Vec3(spacing, spacing, spacing)

        self.envs = []
        self.actor_handles = []
        for i in range(self.num_envs):
            env = self.gym.create_env(self.sim, lower, upper, int(self.num_envs ** 0.5))
            pose = gymapi.Transform()
            pose.p = gymapi.Vec3(0.0, 1.0, 0.0)
            actor = self.gym.create_actor(env, asset, pose, "actor", i, 1)
            self.envs.append(env)
            self.actor_handles.append(actor)

    def pre_physics_step(self, actions):
        # Convert RL actions to physics commands
        self.actions = actions.clone().to(self.device)
        forces = self.actions * self.action_scale
        force_tensor = gymtorch.unwrap_tensor(forces)
        self.gym.set_dof_actuation_force_tensor(self.sim, force_tensor)

    def post_physics_step(self):
        # Increment progress counter
        self.progress_buf += 1

        # Refresh GPU tensors with latest physics state
        self.gym.refresh_dof_state_tensor(self.sim)
        self.gym.refresh_actor_root_state_tensor(self.sim)

        # Compute observations, rewards, and resets
        self.compute_observations()
        self.compute_reward()

        # Handle environment resets
        env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
        if len(env_ids) > 0:
            self.reset_idx(env_ids)

    def compute_observations(self):
        # Fill self.obs_buf with current state
        self.obs_buf[:, 0:3] = self.root_states[:, 0:3]   # position
        self.obs_buf[:, 3:6] = self.root_states[:, 7:10]   # linear velocity
        # ... fill remaining observation components ...
        return self.obs_buf

    def compute_reward(self):
        # Compute per-environment rewards and reset flags
        self.rew_buf[:], self.reset_buf[:] = compute_my_task_reward(
            self.obs_buf,
            self.actions,
            self.progress_buf,
            self.max_episode_length,
        )

    def reset_idx(self, env_ids):
        # Randomize DOF states for reset environments
        positions = torch_rand_float(-0.2, 0.2,
            (len(env_ids), self.num_dof), device=self.device)
        velocities = torch_rand_float(-0.1, 0.1,
            (len(env_ids), self.num_dof), device=self.device)

        self.dof_pos[env_ids] = positions
        self.dof_vel[env_ids] = velocities

        env_ids_int32 = env_ids.to(dtype=torch.int32)
        self.gym.set_dof_state_tensor_indexed(
            self.sim,
            gymtorch.unwrap_tensor(self.dof_state),
            gymtorch.unwrap_tensor(env_ids_int32),
            len(env_ids_int32),
        )

        # Reset buffers for these environments
        self.reset_buf[env_ids] = 0
        self.progress_buf[env_ids] = 0

Cartpole: Complete Reference Implementation

# From isaacgymenvs/tasks/cartpole.py (L36-196)
class Cartpole(VecTask):
    def __init__(self, cfg, rl_device, sim_device, graphics_device_id,
                 headless, virtual_screen_capture, force_render):
        self.cfg = cfg
        self.reset_dist = self.cfg["env"]["resetDist"]
        self.max_push_effort = self.cfg["env"]["maxEffort"]
        self.max_episode_length = 500

        self.cfg["env"]["numObservations"] = 4
        self.cfg["env"]["numActions"] = 1

        super().__init__(config=self.cfg, rl_device=rl_device,
                         sim_device=sim_device,
                         graphics_device_id=graphics_device_id,
                         headless=headless,
                         virtual_screen_capture=virtual_screen_capture,
                         force_render=force_render)

        dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
        self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
        self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0]
        self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]

    def create_sim(self):
        self.sim_params.up_axis = gymapi.UP_AXIS_Z
        self.sim_params.gravity.x = 0
        self.sim_params.gravity.y = 0
        self.sim_params.gravity.z = -9.81
        self.sim = super().create_sim(
            self.device_id, self.graphics_device_id,
            self.physics_engine, self.sim_params)
        self._create_ground_plane()
        self._create_envs(self.num_envs, self.cfg["env"]["envSpacing"], int(np.sqrt(self.num_envs)))

    def pre_physics_step(self, actions):
        self.actions = actions.clone().to(self.device)
        forces = torch.zeros(self.num_envs, self.num_dof,
                             dtype=torch.float, device=self.device)
        forces[:, 0] = self.actions[:, 0] * self.max_push_effort
        force_tensor = gymtorch.unwrap_tensor(forces)
        self.gym.set_dof_actuation_force_tensor(self.sim, force_tensor)

    def post_physics_step(self):
        self.progress_buf += 1
        self.gym.refresh_dof_state_tensor(self.sim)
        self.compute_observations()
        self.compute_reward()
        env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
        if len(env_ids) > 0:
            self.reset_idx(env_ids)

    def compute_observations(self):
        self.gym.refresh_dof_state_tensor(self.sim)
        self.obs_buf[:, 0] = self.dof_pos[:, 0]  # cart position
        self.obs_buf[:, 1] = self.dof_vel[:, 0]  # cart velocity
        self.obs_buf[:, 2] = self.dof_pos[:, 1]  # pole angle
        self.obs_buf[:, 3] = self.dof_vel[:, 1]  # pole angular velocity
        return self.obs_buf

    def compute_reward(self):
        self.rew_buf[:], self.reset_buf[:] = compute_cartpole_reward(
            self.obs_buf, self.reset_dist, self.reset_buf,
            self.progress_buf, self.max_episode_length)

    def reset_idx(self, env_ids):
        positions = 0.2 * (torch.rand((len(env_ids), self.num_dof),
                           device=self.device) - 0.5)
        velocities = 0.5 * (torch.rand((len(env_ids), self.num_dof),
                            device=self.device) - 0.5)
        self.dof_pos[env_ids] = positions[:]
        self.dof_vel[env_ids] = velocities[:]
        env_ids_int32 = env_ids.to(dtype=torch.int32)
        self.gym.set_dof_state_tensor_indexed(
            self.sim, gymtorch.unwrap_tensor(self.dof_state),
            gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
        self.reset_buf[env_ids] = 0
        self.progress_buf[env_ids] = 0

Key Inherited Buffers

Buffer Shape Type Description
self.obs_buf [num_envs, num_obs] torch.float32 Observation buffer filled by compute_observations()
self.rew_buf [num_envs] torch.float32 Reward buffer filled by compute_reward()
self.reset_buf [num_envs] torch.int32 Reset flag buffer (1 = reset needed, 0 = continue)
self.progress_buf [num_envs] torch.int32 Episode step counter, incremented each step
self.actions [num_envs, num_actions] torch.float32 Latest actions from the RL policy
self.extras dict dict Extra info returned to the training algorithm (e.g., episode statistics)

Related Pages

Principle:Isaac_sim_IsaacGymEnvs_VecTask_Subclass_Creation

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment