| Field |
Value
|
| Implementation Name |
BaseRealAgent and LeRobotAgent
|
| Type |
API Doc
|
| Domain |
Robotics
|
| Source Files |
mani_skill/agents/base_real_agent.py (L12-202), mani_skill/agents/robots/lerobot/manipulator.py (L24-124)
|
| Date |
2026-02-15
|
| Repository |
Haosulab/ManiSkill
|
Overview
The BaseRealAgent abstract class defines the hardware abstraction interface for controlling real robots, and LeRobotRealAgent is a concrete implementation that connects to robots supported by the Hugging Face LeRobot library. Together, they provide the agent layer of the sim2real deployment stack, handling motor commands, sensor capture, and proprioceptive state reading.
Description
BaseRealAgent defines the contract that all real robot drivers must fulfill, including lifecycle management (start/stop), motor control (set target qpos/qvel), safe reset, sensor data capture, and proprioception. It also provides a RealRobot inner class that mimics the simulation robot API for compatibility with the sim2real bridge.
LeRobotRealAgent implements this contract for LeRobot-compatible hardware (e.g., the SO-100 arm). It handles unit conversion between radians and degrees, provides smooth reset motion via position interpolation, supports QPos caching for higher control frequencies, and integrates with LeRobot's camera system.
Usage
from lerobot.common.robots.robot import Robot
from mani_skill.agents.robots.lerobot.manipulator import LeRobotRealAgent
robot = Robot(name="so100_follower", config=my_config)
agent = LeRobotRealAgent(robot=robot, use_cached_qpos=True)
agent.start()
# Read joint positions
qpos = agent.get_qpos() # shape (1, N) in radians
# Command joint positions
agent.set_target_qpos(target_qpos) # in radians
# Capture and retrieve camera data
agent.capture_sensor_data(["base_camera"])
data = agent.get_sensor_data(["base_camera"])
# data["base_camera"]["rgb"] has shape (1, H, W, 3) as torch.uint8
agent.stop()
Code Reference
BaseRealAgent Abstract Interface
class BaseRealAgent:
def __init__(self, sensor_configs: dict[str, BaseSensorConfig] = dict()):
self.sensor_configs = sensor_configs
self._sim_agent: BaseAgent = None
# Creates self.robot (RealRobot inner class)
# Lifecycle
def start(self): raise NotImplementedError
def stop(self): raise NotImplementedError
# Motor control
def set_target_qpos(self, qpos: Array): raise NotImplementedError
def set_target_qvel(self, qvel: Array): raise NotImplementedError
def reset(self, qpos: Array): raise NotImplementedError
# Sensor data
def capture_sensor_data(self, sensor_names=None): raise NotImplementedError
def get_sensor_data(self, sensor_names=None): raise NotImplementedError
def get_sensor_params(self, sensor_names=None): return dict()
# Proprioception
def get_qpos(self): raise NotImplementedError
def get_qvel(self): raise NotImplementedError
def get_proprioception(self):
"""Returns dict(qpos=..., qvel=..., controller=...)"""
BaseRealAgent.RealRobot Inner Class
@dataclass
class RealRobot:
agent: BaseRealAgent
@property
def qpos(self):
return self.get_qpos()
def get_qpos(self):
return self.agent.get_qpos()
@property
def qvel(self):
return self.get_qvel()
def get_qvel(self):
return self.agent.get_qvel()
LeRobotRealAgent Constructor
class LeRobotRealAgent(BaseRealAgent):
def __init__(self, robot: Robot, use_cached_qpos: bool = True, **kwargs):
super().__init__(**kwargs)
self._captured_sensor_data = None
self.real_robot = robot
self.use_cached_qpos = use_cached_qpos
self._cached_qpos = None
self._motor_keys: list[str] = None
| Parameter |
Type |
Default |
Description
|
robot |
Robot |
(required) |
LeRobot Robot instance for the physical robot.
|
use_cached_qpos |
bool |
True |
Cache qpos readings for higher control frequency.
|
LeRobotRealAgent Key Methods
def start(self):
self.real_robot.connect()
def stop(self):
self.real_robot.disconnect()
def set_target_qpos(self, qpos: Array):
"""Convert radians to degrees and send to motors."""
self._cached_qpos = None
qpos = common.to_cpu_tensor(qpos).flatten()
qpos = torch.rad2deg(qpos)
qpos = {f"{self._motor_keys[i]}.pos": qpos[i] for i in range(len(qpos))}
self.real_robot.send_action(qpos)
def reset(self, qpos: Array):
"""Gradually move to target qpos at bounded velocity (0.025 rad/step at 30Hz)."""
freq = 30
max_rad_per_step = 0.025
target_pos = self.qpos
for _ in range(int(20 * freq)):
delta_step = (qpos - target_pos).clip(
min=-max_rad_per_step, max=max_rad_per_step
)
if np.linalg.norm(delta_step) <= 1e-4:
break
target_pos += delta_step
self.set_target_qpos(target_pos)
busy_wait(1 / freq - dt_s)
def capture_sensor_data(self, sensor_names=None):
"""Async read from cameras, store as dict of {name: {rgb: tensor}}."""
def get_sensor_data(self, sensor_names=None):
"""Return previously captured sensor data."""
def get_qpos(self):
"""Read joint positions from motors (5-6ms typical), convert to radians.
Uses cache if use_cached_qpos=True and cache is valid."""
if self.use_cached_qpos and self._cached_qpos is not None:
return self._cached_qpos.clone()
qpos_deg = self.real_robot.bus.sync_read("Present_Position")
# ... convert to radians, cache, and return shape (1, N) tensor ...
I/O Contract
BaseRealAgent Interface
| Method |
Input |
Output
|
start() |
none |
none (side effect: hardware initialized)
|
stop() |
none |
none (side effect: hardware shut down)
|
set_target_qpos(qpos) |
Array shape (N,) or (1,N) in radians |
none (side effect: motors commanded)
|
set_target_qvel(qvel) |
Array shape (N,) or (1,N) in rad/s |
none (side effect: motors commanded)
|
reset(qpos) |
Array shape (N,) in radians |
none (side effect: robot moved safely)
|
capture_sensor_data(names) |
Optional[list[str]] |
none (side effect: data captured async)
|
get_sensor_data(names) |
Optional[list[str]] |
dict[str, dict] e.g., {"cam": {"rgb": Tensor(1,H,W,3)}}
|
get_qpos() |
none |
torch.Tensor shape (1, N) in radians
|
get_qvel() |
none |
torch.Tensor shape (1, N) in rad/s
|
get_proprioception() |
none |
dict(qpos=..., qvel=..., controller=...)
|
External Dependencies
| Package |
Purpose
|
| lerobot.common.robots.robot |
Robot class for real robot connection and communication
|
| lerobot.common.cameras.camera |
Camera class for real camera capture
|
| lerobot.common.motors.motors_bus |
MotorNormMode for motor calibration
|
| lerobot.common.utils.robot_utils |
busy_wait for precise timing
|
| torch |
Tensor operations, unit conversions
|
| numpy |
Array operations for norm calculations
|
Usage Examples
# Full deployment example with LeRobot SO-100
from lerobot.common.robots.robot import Robot
from mani_skill.agents.robots.lerobot.manipulator import LeRobotRealAgent
from mani_skill.envs.sim2real_env import Sim2RealEnv
import gymnasium as gym
# 1. Create real robot agent
robot = Robot(name="so100_follower", config=so100_config)
agent = LeRobotRealAgent(robot=robot, use_cached_qpos=True)
agent.start()
# 2. Create simulation environment
sim_env = gym.make(
"GraspCubeSO100Digital-v1",
obs_mode="rgb",
control_mode="pd_joint_pos",
sim_backend="cpu",
)
# 3. Create bridge environment
env = Sim2RealEnv(sim_env=sim_env, agent=agent, control_freq=30)
# 4. Run policy
obs, info = env.reset()
done = False
while not done:
action = policy(obs)
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
# 5. Cleanup
env.close()
Related Pages