Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Haosulab ManiSkill BaseEnv Step Reset

From Leeroopedia
Field Value
implementation_name Haosulab_ManiSkill_BaseEnv_Step_Reset
overview Concrete tool for batched environment stepping and resetting in ManiSkill GPU-parallelized simulation
type Library API
domains Simulation, Reinforcement_Learning, Robotics
last_updated 2026-02-15
related_pages Principle:Haosulab_ManiSkill_GPU_Parallelized_Rollout

Overview

Description

The BaseEnv.step() and BaseEnv.reset() methods are the core simulation interface of ManiSkill. When running on GPU with multiple parallel environments, these methods operate on batched tensors -- a single step() call advances all environments simultaneously, and reset() can selectively reset specific environments via the env_idx option (partial reset).

The step() method (lines 1042-1071 of sapien_env.py) processes actions through the robot controller, advances the physics simulation, computes observations, rewards, and termination conditions.

The reset() method (lines 886-978) handles environment initialization, including optional reconfiguration for domain randomization, seeding for reproducibility, and selective resetting of individual environments.

Usage

These methods are called during both rollout collection and evaluation. During training rollouts, step() is called for each timestep, and reset() is called automatically by the ManiSkillVectorEnv wrapper when episodes end. During evaluation, the same step/reset cycle is used but with deterministic actions.

Code Reference

Field Value
Repository https://github.com/haosulab/ManiSkill
File mani_skill/envs/sapien_env.py
step() location Lines 1042-1071
reset() location Lines 886-978

step() method:

def step(self, action: Union[None, np.ndarray, torch.Tensor, dict]):
    """
    Take a step through the environment with an action.
    Actions are automatically clipped to the action space.

    If action is None, the environment will proceed forward in time
    without sending any actions/control signals to the agent.
    """
    action = self._step_action(action)
    self._elapsed_steps += 1
    info = self.get_info()
    obs = self.get_obs(info, unflattened=True)
    reward = self.get_reward(obs=obs, action=action, info=info)
    obs = self._flatten_raw_obs(obs)
    if "success" in info:
        if "fail" in info:
            terminated = torch.logical_or(info["success"], info["fail"])
        else:
            terminated = info["success"].clone()
    else:
        if "fail" in info:
            terminated = info["fail"].clone()
        else:
            terminated = torch.zeros(self.num_envs, dtype=bool, device=self.device)
    self._last_obs = obs
    return (
        obs,
        reward,
        terminated,
        torch.zeros(self.num_envs, dtype=bool, device=self.device),
        info,
    )

reset() method (simplified):

def reset(self, seed=None, options=None):
    if options is None:
        options = dict()
    reconfigure = options.get("reconfigure", False)

    if "env_idx" in options:
        env_idx = options["env_idx"]
    else:
        env_idx = torch.arange(0, self.num_envs, device=self.device)

    self._set_main_rng(seed)
    if reconfigure:
        self._set_episode_rng(seed if seed is not None else self._batched_main_rng.randint(2**31), env_idx)
        with torch.random.fork_rng():
            torch.manual_seed(seed=self._episode_seed[0])
            self._reconfigure(options)
            self._after_reconfigure(options)
    else:
        self._set_episode_rng(seed, env_idx)

    self.scene._reset_mask = torch.zeros(self.num_envs, dtype=torch.bool, device=self.device)
    self.scene._reset_mask[env_idx] = True
    self._elapsed_steps[env_idx] = 0

    self._clear_sim_state()

    if self.agent is not None:
        self.agent.reset()

    self._initialize_episode(env_idx, options)

    if self.gpu_sim_enabled:
        self.scene._gpu_apply_all()
        self.scene.px.gpu_update_articulation_kinematics()
        self.scene._gpu_fetch_all()

    # Reset controllers
    if self.agent is not None:
        if isinstance(self.agent.controller, dict):
            for controller in self.agent.controller.values():
                controller.reset()
        else:
            self.agent.controller.reset()

    info = self.get_info()
    obs = self.get_obs(info)
    return obs, info

I/O Contract

step(action):

Direction Name Type Shape Description
Input action Union[None, np.ndarray, torch.Tensor, dict] (num_envs, act_dim) Batched action tensor. None advances simulation without control.
Output obs torch.Tensor (num_envs, obs_dim) Batched observations for all environments
Output reward torch.Tensor (num_envs,) Scalar reward for each environment
Output terminated torch.Tensor[bool] (num_envs,) True if task succeeded or failed (from info["success"] or info["fail"])
Output truncated torch.Tensor[bool] (num_envs,) Always False from BaseEnv; truncation is handled by the TimeLimit wrapper
Output info dict varies Contains "success", "fail", and task-specific info

reset(seed, options):

Direction Name Type Description
Input seed Optional[int] RNG seed for reproducibility. If None, uses previously set seed.
Input options Optional[dict] Options dict. Key entries: "env_idx" for partial reset, "reconfigure" for asset randomization.
Output obs torch.Tensor Initial observations after reset, shape (num_envs, obs_dim)
Output info dict Initial info dict, includes "reconfigure" flag

Partial reset mechanics:

When options={"env_idx": tensor_of_indices} is passed to reset():

  • Only the environments at the specified indices are reset
  • Other environments are untouched and retain their current state
  • The returned obs and info contain data for all environments (updated for reset ones, unchanged for others)

Internal step pipeline:

  1. _step_action(action): Clips actions to bounds, applies to robot controller, runs physics simulation
  2. self._elapsed_steps += 1: Increments the step counter for all environments
  3. get_info(): Computes task-specific info (success/fail evaluation)
  4. get_obs(info): Computes observations based on obs_mode
  5. get_reward(obs, action, info): Computes reward signal
  6. Termination: derived from info["success"] and/or info["fail"]

Usage Examples

Example 1: Basic step-reset loop

import gymnasium as gym
import mani_skill.envs
import torch

env = gym.make("PickCube-v1", num_envs=512, obs_mode="state", sim_backend="physx_cuda")
obs, info = env.reset(seed=42)

for step in range(100):
    action = torch.randn(512, env.single_action_space.shape[0], device=env.device)
    obs, reward, terminated, truncated, info = env.step(action)
    # obs: (512, obs_dim), reward: (512,), terminated: (512,), truncated: (512,)

Example 2: Partial reset (called by ManiSkillVectorEnv internally)

# When environments 3, 7, 15 finish their episodes:
done_indices = torch.tensor([3, 7, 15], device=env.device)
obs, info = env.reset(options={"env_idx": done_indices})
# Only environments 3, 7, 15 are reset; others retain their state

Example 3: Rollout collection loop (from PPO baseline)

# Pre-allocate rollout buffers on GPU
obs_buf = torch.zeros((num_steps, num_envs) + envs.single_observation_space.shape).to(device)
actions_buf = torch.zeros((num_steps, num_envs) + envs.single_action_space.shape).to(device)
rewards_buf = torch.zeros((num_steps, num_envs)).to(device)
dones_buf = torch.zeros((num_steps, num_envs)).to(device)
values_buf = torch.zeros((num_steps, num_envs)).to(device)
logprobs_buf = torch.zeros((num_steps, num_envs)).to(device)

next_obs, _ = envs.reset(seed=1)
next_done = torch.zeros(num_envs, device=device)

for step in range(num_steps):
    obs_buf[step] = next_obs
    dones_buf[step] = next_done

    with torch.no_grad():
        action, logprob, _, value = agent.get_action_and_value(next_obs)
        values_buf[step] = value.flatten()
    actions_buf[step] = action
    logprobs_buf[step] = logprob

    next_obs, reward, terminations, truncations, infos = envs.step(clip_action(action))
    next_done = torch.logical_or(terminations, truncations).to(torch.float32)
    rewards_buf[step] = reward.view(-1) * reward_scale

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment