Implementation:Haosulab ManiSkill Sim2RealEnv
| Field | Value |
|---|---|
| Implementation Name | Sim2RealEnv |
| Type | API Doc |
| Domain | Sim2Real |
| Source File | mani_skill/envs/sim2real_env.py (L15-383)
|
| Date | 2026-02-15 |
| Repository | Haosulab/ManiSkill |
Overview
The Sim2RealEnv class provides a Gymnasium-compatible environment that bridges a ManiSkill simulation environment with a physical robot. It maintains identical observation and action spaces between simulation and reality, enabling policies trained in simulation to be deployed on real hardware without code changes. The class handles control frequency enforcement, sensor data preprocessing, wrapper compatibility, and observation alignment validation.
Description
Sim2RealEnv is instantiated with a simulation environment and a BaseRealAgent instance. It replicates the Gymnasium reset/step interface by:
- Delegating action computation to the simulation's controller, then sending the resulting joint targets to the real robot.
- Capturing sensor data from real cameras and preprocessing it to match simulation image dimensions.
- Reading proprioceptive state (qpos, qvel) from the real robot and formatting it identically to the simulation.
- Maintaining wrapper compatibility by temporarily swapping the environment reference that wrappers call into.
Usage
from mani_skill.envs.sim2real_env import Sim2RealEnv
import gymnasium as gym
sim_env = gym.make("GraspCubeSO100Digital-v1", obs_mode="rgb",
control_mode="pd_joint_pos", sim_backend="cpu")
real_agent = MyRealAgent(...) # inherits from BaseRealAgent
real_agent.start()
env = Sim2RealEnv(
sim_env=sim_env,
agent=real_agent,
control_freq=30,
skip_data_checks=False,
)
obs, info = env.reset()
action = policy(obs)
obs, reward, terminated, truncated, info = env.step(action)
env.close()
Code Reference
Class Signature
class Sim2RealEnv(gym.Env):
metadata = {"render_modes": ["rgb_array", "sensors", "all"]}
def __init__(
self,
sim_env: gym.Env,
agent: BaseRealAgent,
real_reset_function: Optional[
Callable[["Sim2RealEnv", Optional[int], Optional[dict]], None]
] = None,
sensor_data_preprocessing_function: Optional[Callable[[dict], dict]] = None,
render_mode: Optional[str] = "sensors",
skip_data_checks: bool = False,
control_freq: Optional[int] = None,
):
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
sim_env |
gym.Env |
(required) | The simulation environment to align with. Must use physx_cpu backend.
|
agent |
BaseRealAgent |
(required) | The real robot agent for hardware control and sensing. |
real_reset_function |
Optional[Callable] |
None |
Custom reset function. Default resets sim, moves robot to sim reset qpos, and prompts user. |
sensor_data_preprocessing_function |
Optional[Callable] |
None |
Custom sensor data preprocessing. Default center-crops and resizes images. |
render_mode |
Optional[str] |
"sensors" |
Render mode: "rgb_array", "sensors", or "all".
|
skip_data_checks |
bool |
False |
Skip observation alignment validation on init. |
control_freq |
Optional[int] |
None |
Control frequency in Hz. Uses sim env's control_freq if None. |
Key Methods
def reset(self, seed=None, options=None):
"""Reset the real environment. Calls real_reset_function, then syncs sim robot qpos."""
def step(self, action):
"""Step the real environment:
1. Pass action to sim controller to compute joint targets.
2. Enforce control timing (sleep if faster than control_freq).
3. Send joint targets to real robot.
4. Apply any wrappers to the returned observation.
"""
def _step_action(self, action):
"""Compute joint targets from action using sim controller,
enforce timing, and send to real robot."""
def get_obs(self, info=None, unflattened=False):
"""Get observation using sim env's get_obs logic with real sensor data."""
def _get_obs_sensor_data(self, apply_texture_transforms=True):
"""Capture and preprocess real sensor data."""
def preprocess_sensor_data(self, sensor_data, sensor_names=None):
"""Default preprocessing: center-crop to sim aspect ratio, then resize."""
def _check_observations(self, sample_sim_obs, sample_real_obs):
"""Recursively verify matching shapes and dtypes between sim and real obs."""
def close(self):
"""Stop the real agent."""
Control Frequency Enforcement (L161-187)
def _step_action(self, action):
self.base_sim_env.agent.set_action(action)
sim_articulation = self.agent.controller.articulation
if self.last_control_time is None:
self.last_control_time = time.perf_counter()
else:
dt = time.perf_counter() - self.last_control_time
if dt < self.control_dt:
time.sleep(self.control_dt - dt)
else:
logger.warning(
f"Control dt {self.control_dt} was not reached, actual dt was {dt}"
)
self.last_control_time = time.perf_counter()
if self.agent.controller.sets_target_qpos:
self.agent.set_target_qpos(sim_articulation.drive_targets)
if self.agent.controller.sets_target_qvel:
self.agent.set_target_qvel(sim_articulation.drive_velocities)
Default Sensor Preprocessing (L349-380)
def preprocess_sensor_data(self, sensor_data, sensor_names=None):
"""Center-crop real images to simulation aspect ratio, then resize."""
for sensor_name in sensor_names:
sim_sensor_cfg = self.base_sim_env._sensor_configs[sensor_name]
target_h, target_w = sim_sensor_cfg.height, sim_sensor_cfg.width
for key in ["rgb", "depth"]:
if key in real_sensor_data:
img = real_sensor_data[key][0].numpy()
# Center crop to square, then resize
crop_res = np.min(img.shape[:2])
cutoff = (np.max(img.shape[:2]) - crop_res) // 2
# ... crop and resize using cv2.resize ...
return sensor_data
I/O Contract
| Direction | Data | Format |
|---|---|---|
| Input (constructor) | Simulation environment | gym.Env wrapping a ManiSkill BaseEnv (physx_cpu backend)
|
| Input (constructor) | Real robot agent | BaseRealAgent instance
|
| Input (step) | Action | Same format/space as sim_env.action_space
|
| Output (step) | Observation | Same structure/shape/dtype as sim_env.observation_space
|
| Output (step) | Reward | float (may be 0 if not implemented for real)
|
| Output (step) | Terminated/Truncated | bool
|
| Output (step) | Info | dict with at least elapsed_steps
|
Wrapper Compatibility
The Sim2RealEnv preserves any Gymnasium wrappers applied to the simulation environment. The wrapper chain is traversed during initialization and the last wrapper's environment reference is temporarily swapped during step and reset to route calls through the real environment logic.
Usage Examples
import gymnasium as gym
from mani_skill.envs.sim2real_env import Sim2RealEnv
from mani_skill.agents.robots.lerobot.manipulator import LeRobotRealAgent
from lerobot.common.robots.robot import Robot
# Setup
robot = Robot(name="so100_follower", config=my_config)
agent = LeRobotRealAgent(robot=robot, use_cached_qpos=True)
agent.start()
sim_env = gym.make("GraspCubeSO100Digital-v1", obs_mode="rgb",
control_mode="pd_joint_pos", sim_backend="cpu")
# Custom reset function
def my_reset(env, seed=None, options=None):
env.sim_env.reset(seed=seed, options=options)
env.agent.reset(qpos=env.base_sim_env.agent.robot.qpos.cpu().flatten())
print("Environment reset. Ready to go!")
env = Sim2RealEnv(
sim_env=sim_env,
agent=agent,
real_reset_function=my_reset,
control_freq=30,
)
# Deploy policy
obs, info = env.reset()
for step_idx in range(300):
action = policy.predict(obs)
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
break
env.close()