Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ARISE Initiative Robosuite Collect Human Trajectory

From Leeroopedia

Metadata:

Overview

Concrete function for collecting a single human demonstration trajectory via teleoperation provided by the robosuite collection scripts.

Description

`collect_human_trajectory()` runs a teleoperation loop that resets the environment, starts device control, then loops: input2action() → create_action_vector() → env.step(). The DataCollectionWrapper transparently records all data. Supports frame rate limiting via max_fr. Checks task success via env._check_success().

The function handles the complete lifecycle of a single demonstration episode:

  1. Initialization: Resets the environment to a new initial state
  2. Device Setup: Starts the input device control loop
  3. Control Loop: Continuously reads device input, converts to robot actions, and executes in the environment
  4. Termination Detection: Monitors for task success, episode timeout, or user-triggered reset
  5. Frame Rate Management: Limits loop frequency to ensure smooth human control

All state observations, actions, and rewards are automatically captured by the DataCollectionWrapper without explicit recording calls in the control loop.

Usage

Called repeatedly in the collection script to gather multiple demonstrations per session. Typically invoked within a data collection session where multiple trajectories are gathered sequentially.

Code Reference

Source: robosuite

File: robosuite/scripts/collect_human_demonstrations.py

Lines: L23-117

Signature:

def collect_human_trajectory(env, device, arm, max_fr, goal_update_mode):
    """
    Use the device to collect a demonstration. Rollout saved to npz files.
    Args:
        env (MujocoEnv): environment to control
        device (Device): input device
        arm (str): which arm ('right' or 'left')
        max_fr (int): max frame rate
        goal_update_mode (str): 'target' or 'achieved'
    """
</ighlight>

'''Import:'''

<syntaxhighlight lang="python">
from robosuite.scripts.collect_human_demonstrations import collect_human_trajectory

I/O Contract

Inputs

Parameter Type Required Description
env MujocoEnv Yes Wrapped environment instance with DataCollectionWrapper for recording
device Device Yes Input device (e.g., SpaceMouse, keyboard) for teleoperation control
arm str Yes Which arm to control: 'right' or 'left' for bimanual robots
max_fr int Yes Maximum frame rate (Hz) for the control loop to ensure smooth human operation
goal_update_mode str Yes Goal update strategy: 'target' or 'achieved' for goal-conditioned tasks

Outputs

Returns: None

Side Effects: Episode data (states, actions, rewards, metadata) saved to disk by the DataCollectionWrapper in npz format.

Usage Examples

Example: Complete demonstration collection setup

import robosuite as suite
from robosuite import load_controller_config
from robosuite.wrappers import DataCollectionWrapper
from robosuite.scripts.collect_human_demonstrations import collect_human_trajectory
from robosuite.devices import SpaceMouse

# Create environment
controller_config = load_controller_config(default_controller="OSC_POSE")
env = suite.make(
    env_name="Lift",
    robots="Panda",
    has_renderer=True,
    has_offscreen_renderer=False,
    use_camera_obs=False,
    controller_configs=controller_config,
    horizon=500,
)

# Wrap with data collection wrapper
env = DataCollectionWrapper(
    env,
    directory="./demonstrations/",
    collect_freq=1,
)

# Initialize device
device = SpaceMouse()
device.start_control()

# Collection parameters
arm = "right"
max_fr = 20  # 20 Hz control loop
goal_update_mode = "target"

# Collect multiple demonstrations
num_demos = 10
for i in range(num_demos):
    print(f"Collecting demonstration {i+1}/{num_demos}")
    collect_human_trajectory(
        env=env,
        device=device,
        arm=arm,
        max_fr=max_fr,
        goal_update_mode=goal_update_mode,
    )
    print(f"Demonstration {i+1} saved.")

# Cleanup
device.stop_control()
env.close()
print(f"Collection complete. {num_demos} demonstrations saved.")

Example: Collection with keyboard device

from robosuite.devices import Keyboard

# Setup environment (same as above)
# ...

# Use keyboard instead of SpaceMouse
device = Keyboard(pos_sensitivity=1.0, rot_sensitivity=1.0)

# Collect with slower frame rate for keyboard control
collect_human_trajectory(
    env=env,
    device=device,
    arm="right",
    max_fr=10,  # Slower for keyboard
    goal_update_mode="target",
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment