Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Isaac sim IsaacGymEnvs VecTask Simulation Loop

From Leeroopedia
Knowledge Sources
Type API Doc
Domains Simulation, Architecture
Last Updated 2026-02-15 00:00 GMT

Overview

API reference for the simulation loop methods defined in VecTask and implemented by task subclasses, with Cartpole as the reference implementation.

Description

This document provides the concrete method signatures from the VecTask base class (vec_task.py:L326-455) and their implementations in the Cartpole task (cartpole.py:L56-196). Each method's role, parameters, and interaction with GPU tensor buffers is documented.

Usage

Reference this API when implementing the core methods of a new IsaacGymEnvs task, or when debugging the simulation loop behavior of an existing task.

Code Reference

Source Location

  • Repository: NVIDIA-Omniverse/IsaacGymEnvs
  • Base class: isaacgymenvs/tasks/base/vec_task.py (L326-455)
  • Reference task: isaacgymenvs/tasks/cartpole.py (L56-196)

Import

from isaacgymenvs.tasks.base.vec_task import VecTask

VecTask Base Class Signatures

class VecTask(Env):
    """Base class for GPU-accelerated vectorized RL environments."""

    def create_sim(self, compute_device, graphics_device, physics_engine, sim_params):
        """Create the Isaac Gym simulation instance.

        Called during __init__. Subclasses override this to set up the physics world.

        Args:
            compute_device (int): GPU device ID for physics computation
            graphics_device (int): GPU device ID for rendering
            physics_engine (gymapi.SimType): SIM_PHYSX or SIM_FLEX
            sim_params (gymapi.SimParams): Simulation parameters (dt, gravity, solver settings)

        Returns:
            sim: Isaac Gym simulation handle
        """
        # vec_task.py:L326-342

    def step(self, actions: torch.Tensor):
        """Main simulation loop: apply actions, simulate physics, compute outputs.

        This is the primary entry point called by the training algorithm each timestep.

        Args:
            actions (torch.Tensor): Action tensor [num_envs, num_actions] from RL policy

        Returns:
            obs_buf (torch.Tensor): Observations [num_envs, num_obs]
            rew_buf (torch.Tensor): Rewards [num_envs]
            reset_buf (torch.Tensor): Reset flags [num_envs]
            extras (dict): Additional info (episode stats, etc.)
        """
        # vec_task.py:L360-408

    def pre_physics_step(self, actions: torch.Tensor):
        """Abstract: Apply RL actions to the simulation before physics stepping.

        Must be overridden by subclasses.

        Args:
            actions (torch.Tensor): Raw action tensor [num_envs, num_actions]
        """
        # vec_task.py:L349-354 (abstract)
        raise NotImplementedError

    def post_physics_step(self):
        """Abstract: Compute observations, rewards, and resets after physics stepping.

        Must be overridden by subclasses. Typically calls compute_observations(),
        compute_reward(), and handles resets via reset_idx().
        """
        # vec_task.py:L357-358 (abstract)
        raise NotImplementedError

    def reset_idx(self, env_ids):
        """Reset specific environments to initial conditions.

        Called when environments in env_ids have their reset_buf flag set.

        Args:
            env_ids (torch.Tensor): Integer tensor of environment indices to reset
        """
        # vec_task.py:L420-424

    def allocate_buffers(self):
        """Allocate GPU tensor buffers for observations, rewards, resets, and progress.

        Called during __init__ after create_sim(). Creates:
          - self.obs_buf:      torch.zeros(num_envs, num_obs)
          - self.rew_buf:      torch.zeros(num_envs)
          - self.reset_buf:    torch.zeros(num_envs, dtype=torch.long)
          - self.progress_buf: torch.zeros(num_envs, dtype=torch.long)
        """
        # vec_task.py:L290-310

Cartpole Reference Implementations

create_sim (L56-63)

def create_sim(self):
    """Set up physics world with gravity, ground plane, and cartpole environments."""
    self.sim_params.up_axis = gymapi.UP_AXIS_Z
    self.sim_params.gravity.x = 0
    self.sim_params.gravity.y = 0
    self.sim_params.gravity.z = -9.81
    self.sim = super().create_sim(
        self.device_id, self.graphics_device_id,
        self.physics_engine, self.sim_params)
    self._create_ground_plane()
    self._create_envs(
        self.num_envs, self.cfg["env"]["envSpacing"],
        int(np.sqrt(self.num_envs)))

pre_physics_step (L159-163)

def pre_physics_step(self, actions):
    """Apply horizontal force to the cart based on RL actions."""
    self.actions = actions.clone().to(self.device)
    # Create force tensor: only the cart DOF (index 0) receives force
    forces = torch.zeros(self.num_envs, self.num_dof,
                         dtype=torch.float, device=self.device)
    forces[:, 0] = self.actions[:, 0] * self.max_push_effort
    force_tensor = gymtorch.unwrap_tensor(forces)
    self.gym.set_dof_actuation_force_tensor(self.sim, force_tensor)

post_physics_step (L165-196)

def post_physics_step(self):
    """Refresh state, compute observations/rewards, handle resets."""
    self.progress_buf += 1

    # Refresh GPU tensors with new physics state
    self.gym.refresh_dof_state_tensor(self.sim)

    # Compute observations and rewards
    self.compute_observations()
    self.compute_reward()

    # Reset environments that need it
    env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
    if len(env_ids) > 0:
        self.reset_idx(env_ids)

compute_observations (L131-142)

def compute_observations(self):
    """Fill observation buffer with cart/pole position and velocity."""
    self.gym.refresh_dof_state_tensor(self.sim)
    self.obs_buf[:, 0] = self.dof_pos[:, 0]  # cart position
    self.obs_buf[:, 1] = self.dof_vel[:, 0]  # cart velocity
    self.obs_buf[:, 2] = self.dof_pos[:, 1]  # pole angle
    self.obs_buf[:, 3] = self.dof_vel[:, 1]  # pole angular velocity
    return self.obs_buf

compute_reward (L119-129)

@torch.jit.script
def compute_cartpole_reward(obs_buf, reset_dist, reset_buf, progress_buf,
                            max_episode_length):
    """JIT-compiled reward function for Cartpole.

    Reward components:
      - 1.0 alive bonus per timestep
      - Penalty for cart displacement from center
      - Penalty for pole angle from vertical

    Reset conditions:
      - Cart position exceeds reset_dist
      - Episode length exceeds max_episode_length
    """
    cart_pos = obs_buf[:, 0]
    cart_vel = obs_buf[:, 1]
    pole_angle = obs_buf[:, 2]
    pole_vel = obs_buf[:, 3]

    reward = 1.0 - pole_angle * pole_angle - 0.01 * torch.abs(cart_pos)
    reward = torch.where(torch.abs(cart_pos) > reset_dist,
                         torch.ones_like(reward) * -2.0, reward)
    reward = torch.where(torch.abs(pole_angle) > np.pi / 2,
                         torch.ones_like(reward) * -2.0, reward)

    reset = torch.where(torch.abs(cart_pos) > reset_dist,
                        torch.ones_like(reset_buf), reset_buf)
    reset = torch.where(progress_buf >= max_episode_length - 1,
                        torch.ones_like(reset_buf), reset)

    return reward, reset

reset_idx (L144-157)

def reset_idx(self, env_ids):
    """Randomize cart position/velocity and pole angle/velocity for reset envs."""
    positions = 0.2 * (torch.rand((len(env_ids), self.num_dof),
                       device=self.device) - 0.5)
    velocities = 0.5 * (torch.rand((len(env_ids), self.num_dof),
                        device=self.device) - 0.5)

    self.dof_pos[env_ids] = positions[:]
    self.dof_vel[env_ids] = velocities[:]

    env_ids_int32 = env_ids.to(dtype=torch.int32)
    self.gym.set_dof_state_tensor_indexed(
        self.sim,
        gymtorch.unwrap_tensor(self.dof_state),
        gymtorch.unwrap_tensor(env_ids_int32),
        len(env_ids_int32))

    self.reset_buf[env_ids] = 0
    self.progress_buf[env_ids] = 0

Key Buffers Reference

Buffer Shape Dtype Allocated By Written By Read By
self.obs_buf [num_envs, num_obs] float32 allocate_buffers() compute_observations() Training algorithm
self.rew_buf [num_envs] float32 allocate_buffers() compute_reward() Training algorithm
self.reset_buf [num_envs] int64 allocate_buffers() compute_reward(), reset_idx() post_physics_step()
self.progress_buf [num_envs] int64 allocate_buffers() post_physics_step(), reset_idx() compute_reward()
self.actions [num_envs, num_actions] float32 step() pre_physics_step() compute_reward() (optional)

Execution Order Diagram

Training Algorithm
       |
       v
   step(actions)
       |
       +---> pre_physics_step(actions)
       |         |
       |         +---> gym.set_dof_actuation_force_tensor()
       |                    or gym.set_dof_position_target_tensor()
       |
       +---> gym.simulate(sim)          [GPU physics]
       +---> gym.fetch_results(sim)     [sync]
       |
       +---> post_physics_step()
       |         |
       |         +---> progress_buf += 1
       |         +---> gym.refresh_dof_state_tensor()
       |         +---> gym.refresh_actor_root_state_tensor()
       |         +---> compute_observations()  --> obs_buf
       |         +---> compute_reward()        --> rew_buf, reset_buf
       |         +---> reset_idx(env_ids)      --> dof_state, reset_buf=0
       |
       +---> return obs_buf, rew_buf, reset_buf, extras

Related Pages

Principle:Isaac_sim_IsaacGymEnvs_Core_Simulation_Methods

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment