Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Haosulab ManiSkill BaseRealAgent LeRobotAgent

From Leeroopedia
Field Value
Implementation Name BaseRealAgent and LeRobotAgent
Type API Doc
Domain Robotics
Source Files mani_skill/agents/base_real_agent.py (L12-202), mani_skill/agents/robots/lerobot/manipulator.py (L24-124)
Date 2026-02-15
Repository Haosulab/ManiSkill

Overview

The BaseRealAgent abstract class defines the hardware abstraction interface for controlling real robots, and LeRobotRealAgent is a concrete implementation that connects to robots supported by the Hugging Face LeRobot library. Together, they provide the agent layer of the sim2real deployment stack, handling motor commands, sensor capture, and proprioceptive state reading.

Description

BaseRealAgent defines the contract that all real robot drivers must fulfill, including lifecycle management (start/stop), motor control (set target qpos/qvel), safe reset, sensor data capture, and proprioception. It also provides a RealRobot inner class that mimics the simulation robot API for compatibility with the sim2real bridge.

LeRobotRealAgent implements this contract for LeRobot-compatible hardware (e.g., the SO-100 arm). It handles unit conversion between radians and degrees, provides smooth reset motion via position interpolation, supports QPos caching for higher control frequencies, and integrates with LeRobot's camera system.

Usage

from lerobot.common.robots.robot import Robot
from mani_skill.agents.robots.lerobot.manipulator import LeRobotRealAgent

robot = Robot(name="so100_follower", config=my_config)
agent = LeRobotRealAgent(robot=robot, use_cached_qpos=True)
agent.start()

# Read joint positions
qpos = agent.get_qpos()  # shape (1, N) in radians

# Command joint positions
agent.set_target_qpos(target_qpos)  # in radians

# Capture and retrieve camera data
agent.capture_sensor_data(["base_camera"])
data = agent.get_sensor_data(["base_camera"])
# data["base_camera"]["rgb"] has shape (1, H, W, 3) as torch.uint8

agent.stop()

Code Reference

BaseRealAgent Abstract Interface

class BaseRealAgent:
    def __init__(self, sensor_configs: dict[str, BaseSensorConfig] = dict()):
        self.sensor_configs = sensor_configs
        self._sim_agent: BaseAgent = None
        # Creates self.robot (RealRobot inner class)

    # Lifecycle
    def start(self): raise NotImplementedError
    def stop(self): raise NotImplementedError

    # Motor control
    def set_target_qpos(self, qpos: Array): raise NotImplementedError
    def set_target_qvel(self, qvel: Array): raise NotImplementedError
    def reset(self, qpos: Array): raise NotImplementedError

    # Sensor data
    def capture_sensor_data(self, sensor_names=None): raise NotImplementedError
    def get_sensor_data(self, sensor_names=None): raise NotImplementedError
    def get_sensor_params(self, sensor_names=None): return dict()

    # Proprioception
    def get_qpos(self): raise NotImplementedError
    def get_qvel(self): raise NotImplementedError
    def get_proprioception(self):
        """Returns dict(qpos=..., qvel=..., controller=...)"""

BaseRealAgent.RealRobot Inner Class

@dataclass
class RealRobot:
    agent: BaseRealAgent

    @property
    def qpos(self):
        return self.get_qpos()

    def get_qpos(self):
        return self.agent.get_qpos()

    @property
    def qvel(self):
        return self.get_qvel()

    def get_qvel(self):
        return self.agent.get_qvel()

LeRobotRealAgent Constructor

class LeRobotRealAgent(BaseRealAgent):
    def __init__(self, robot: Robot, use_cached_qpos: bool = True, **kwargs):
        super().__init__(**kwargs)
        self._captured_sensor_data = None
        self.real_robot = robot
        self.use_cached_qpos = use_cached_qpos
        self._cached_qpos = None
        self._motor_keys: list[str] = None
Parameter Type Default Description
robot Robot (required) LeRobot Robot instance for the physical robot.
use_cached_qpos bool True Cache qpos readings for higher control frequency.

LeRobotRealAgent Key Methods

def start(self):
    self.real_robot.connect()

def stop(self):
    self.real_robot.disconnect()

def set_target_qpos(self, qpos: Array):
    """Convert radians to degrees and send to motors."""
    self._cached_qpos = None
    qpos = common.to_cpu_tensor(qpos).flatten()
    qpos = torch.rad2deg(qpos)
    qpos = {f"{self._motor_keys[i]}.pos": qpos[i] for i in range(len(qpos))}
    self.real_robot.send_action(qpos)

def reset(self, qpos: Array):
    """Gradually move to target qpos at bounded velocity (0.025 rad/step at 30Hz)."""
    freq = 30
    max_rad_per_step = 0.025
    target_pos = self.qpos
    for _ in range(int(20 * freq)):
        delta_step = (qpos - target_pos).clip(
            min=-max_rad_per_step, max=max_rad_per_step
        )
        if np.linalg.norm(delta_step) <= 1e-4:
            break
        target_pos += delta_step
        self.set_target_qpos(target_pos)
        busy_wait(1 / freq - dt_s)

def capture_sensor_data(self, sensor_names=None):
    """Async read from cameras, store as dict of {name: {rgb: tensor}}."""

def get_sensor_data(self, sensor_names=None):
    """Return previously captured sensor data."""

def get_qpos(self):
    """Read joint positions from motors (5-6ms typical), convert to radians.
    Uses cache if use_cached_qpos=True and cache is valid."""
    if self.use_cached_qpos and self._cached_qpos is not None:
        return self._cached_qpos.clone()
    qpos_deg = self.real_robot.bus.sync_read("Present_Position")
    # ... convert to radians, cache, and return shape (1, N) tensor ...

I/O Contract

BaseRealAgent Interface

Method Input Output
start() none none (side effect: hardware initialized)
stop() none none (side effect: hardware shut down)
set_target_qpos(qpos) Array shape (N,) or (1,N) in radians none (side effect: motors commanded)
set_target_qvel(qvel) Array shape (N,) or (1,N) in rad/s none (side effect: motors commanded)
reset(qpos) Array shape (N,) in radians none (side effect: robot moved safely)
capture_sensor_data(names) Optional[list[str]] none (side effect: data captured async)
get_sensor_data(names) Optional[list[str]] dict[str, dict] e.g., {"cam": {"rgb": Tensor(1,H,W,3)}}
get_qpos() none torch.Tensor shape (1, N) in radians
get_qvel() none torch.Tensor shape (1, N) in rad/s
get_proprioception() none dict(qpos=..., qvel=..., controller=...)

External Dependencies

Package Purpose
lerobot.common.robots.robot Robot class for real robot connection and communication
lerobot.common.cameras.camera Camera class for real camera capture
lerobot.common.motors.motors_bus MotorNormMode for motor calibration
lerobot.common.utils.robot_utils busy_wait for precise timing
torch Tensor operations, unit conversions
numpy Array operations for norm calculations

Usage Examples

# Full deployment example with LeRobot SO-100
from lerobot.common.robots.robot import Robot
from mani_skill.agents.robots.lerobot.manipulator import LeRobotRealAgent
from mani_skill.envs.sim2real_env import Sim2RealEnv
import gymnasium as gym

# 1. Create real robot agent
robot = Robot(name="so100_follower", config=so100_config)
agent = LeRobotRealAgent(robot=robot, use_cached_qpos=True)
agent.start()

# 2. Create simulation environment
sim_env = gym.make(
    "GraspCubeSO100Digital-v1",
    obs_mode="rgb",
    control_mode="pd_joint_pos",
    sim_backend="cpu",
)

# 3. Create bridge environment
env = Sim2RealEnv(sim_env=sim_env, agent=agent, control_freq=30)

# 4. Run policy
obs, info = env.reset()
done = False
while not done:
    action = policy(obs)
    obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

# 5. Cleanup
env.close()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment