Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Haosulab ManiSkill Sim2RealEnv

From Leeroopedia
Field Value
Implementation Name Sim2RealEnv
Type API Doc
Domain Sim2Real
Source File mani_skill/envs/sim2real_env.py (L15-383)
Date 2026-02-15
Repository Haosulab/ManiSkill

Overview

The Sim2RealEnv class provides a Gymnasium-compatible environment that bridges a ManiSkill simulation environment with a physical robot. It maintains identical observation and action spaces between simulation and reality, enabling policies trained in simulation to be deployed on real hardware without code changes. The class handles control frequency enforcement, sensor data preprocessing, wrapper compatibility, and observation alignment validation.

Description

Sim2RealEnv is instantiated with a simulation environment and a BaseRealAgent instance. It replicates the Gymnasium reset/step interface by:

  • Delegating action computation to the simulation's controller, then sending the resulting joint targets to the real robot.
  • Capturing sensor data from real cameras and preprocessing it to match simulation image dimensions.
  • Reading proprioceptive state (qpos, qvel) from the real robot and formatting it identically to the simulation.
  • Maintaining wrapper compatibility by temporarily swapping the environment reference that wrappers call into.

Usage

from mani_skill.envs.sim2real_env import Sim2RealEnv
import gymnasium as gym

sim_env = gym.make("GraspCubeSO100Digital-v1", obs_mode="rgb",
                    control_mode="pd_joint_pos", sim_backend="cpu")
real_agent = MyRealAgent(...)  # inherits from BaseRealAgent
real_agent.start()

env = Sim2RealEnv(
    sim_env=sim_env,
    agent=real_agent,
    control_freq=30,
    skip_data_checks=False,
)
obs, info = env.reset()
action = policy(obs)
obs, reward, terminated, truncated, info = env.step(action)
env.close()

Code Reference

Class Signature

class Sim2RealEnv(gym.Env):
    metadata = {"render_modes": ["rgb_array", "sensors", "all"]}

    def __init__(
        self,
        sim_env: gym.Env,
        agent: BaseRealAgent,
        real_reset_function: Optional[
            Callable[["Sim2RealEnv", Optional[int], Optional[dict]], None]
        ] = None,
        sensor_data_preprocessing_function: Optional[Callable[[dict], dict]] = None,
        render_mode: Optional[str] = "sensors",
        skip_data_checks: bool = False,
        control_freq: Optional[int] = None,
    ):

Constructor Parameters

Parameter Type Default Description
sim_env gym.Env (required) The simulation environment to align with. Must use physx_cpu backend.
agent BaseRealAgent (required) The real robot agent for hardware control and sensing.
real_reset_function Optional[Callable] None Custom reset function. Default resets sim, moves robot to sim reset qpos, and prompts user.
sensor_data_preprocessing_function Optional[Callable] None Custom sensor data preprocessing. Default center-crops and resizes images.
render_mode Optional[str] "sensors" Render mode: "rgb_array", "sensors", or "all".
skip_data_checks bool False Skip observation alignment validation on init.
control_freq Optional[int] None Control frequency in Hz. Uses sim env's control_freq if None.

Key Methods

def reset(self, seed=None, options=None):
    """Reset the real environment. Calls real_reset_function, then syncs sim robot qpos."""

def step(self, action):
    """Step the real environment:
    1. Pass action to sim controller to compute joint targets.
    2. Enforce control timing (sleep if faster than control_freq).
    3. Send joint targets to real robot.
    4. Apply any wrappers to the returned observation.
    """

def _step_action(self, action):
    """Compute joint targets from action using sim controller,
    enforce timing, and send to real robot."""

def get_obs(self, info=None, unflattened=False):
    """Get observation using sim env's get_obs logic with real sensor data."""

def _get_obs_sensor_data(self, apply_texture_transforms=True):
    """Capture and preprocess real sensor data."""

def preprocess_sensor_data(self, sensor_data, sensor_names=None):
    """Default preprocessing: center-crop to sim aspect ratio, then resize."""

def _check_observations(self, sample_sim_obs, sample_real_obs):
    """Recursively verify matching shapes and dtypes between sim and real obs."""

def close(self):
    """Stop the real agent."""

Control Frequency Enforcement (L161-187)

def _step_action(self, action):
    self.base_sim_env.agent.set_action(action)
    sim_articulation = self.agent.controller.articulation

    if self.last_control_time is None:
        self.last_control_time = time.perf_counter()
    else:
        dt = time.perf_counter() - self.last_control_time
        if dt < self.control_dt:
            time.sleep(self.control_dt - dt)
        else:
            logger.warning(
                f"Control dt {self.control_dt} was not reached, actual dt was {dt}"
            )
    self.last_control_time = time.perf_counter()

    if self.agent.controller.sets_target_qpos:
        self.agent.set_target_qpos(sim_articulation.drive_targets)
    if self.agent.controller.sets_target_qvel:
        self.agent.set_target_qvel(sim_articulation.drive_velocities)

Default Sensor Preprocessing (L349-380)

def preprocess_sensor_data(self, sensor_data, sensor_names=None):
    """Center-crop real images to simulation aspect ratio, then resize."""
    for sensor_name in sensor_names:
        sim_sensor_cfg = self.base_sim_env._sensor_configs[sensor_name]
        target_h, target_w = sim_sensor_cfg.height, sim_sensor_cfg.width
        for key in ["rgb", "depth"]:
            if key in real_sensor_data:
                img = real_sensor_data[key][0].numpy()
                # Center crop to square, then resize
                crop_res = np.min(img.shape[:2])
                cutoff = (np.max(img.shape[:2]) - crop_res) // 2
                # ... crop and resize using cv2.resize ...
    return sensor_data

I/O Contract

Direction Data Format
Input (constructor) Simulation environment gym.Env wrapping a ManiSkill BaseEnv (physx_cpu backend)
Input (constructor) Real robot agent BaseRealAgent instance
Input (step) Action Same format/space as sim_env.action_space
Output (step) Observation Same structure/shape/dtype as sim_env.observation_space
Output (step) Reward float (may be 0 if not implemented for real)
Output (step) Terminated/Truncated bool
Output (step) Info dict with at least elapsed_steps

Wrapper Compatibility

The Sim2RealEnv preserves any Gymnasium wrappers applied to the simulation environment. The wrapper chain is traversed during initialization and the last wrapper's environment reference is temporarily swapped during step and reset to route calls through the real environment logic.

Usage Examples

import gymnasium as gym
from mani_skill.envs.sim2real_env import Sim2RealEnv
from mani_skill.agents.robots.lerobot.manipulator import LeRobotRealAgent
from lerobot.common.robots.robot import Robot

# Setup
robot = Robot(name="so100_follower", config=my_config)
agent = LeRobotRealAgent(robot=robot, use_cached_qpos=True)
agent.start()

sim_env = gym.make("GraspCubeSO100Digital-v1", obs_mode="rgb",
                    control_mode="pd_joint_pos", sim_backend="cpu")

# Custom reset function
def my_reset(env, seed=None, options=None):
    env.sim_env.reset(seed=seed, options=options)
    env.agent.reset(qpos=env.base_sim_env.agent.robot.qpos.cpu().flatten())
    print("Environment reset. Ready to go!")

env = Sim2RealEnv(
    sim_env=sim_env,
    agent=agent,
    real_reset_function=my_reset,
    control_freq=30,
)

# Deploy policy
obs, info = env.reset()
for step_idx in range(300):
    action = policy.predict(obs)
    obs, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        break
env.close()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment