Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ARISE Initiative Robosuite Observables

From Leeroopedia
Knowledge Sources
Domains Robotics, Sensor Modeling
Last Updated 2026-02-15 07:00 GMT

Overview

The observables module defines the Observable class and associated decorator/factory functions for modeling realistic sensor pipelines with configurable corruption, filtering, delay, and sampling rate in robosuite simulations.

Description

This module implements a realistic sensor observation pipeline for robotic simulation. Each Observable wraps a sensor function (decorated with @sensor(modality)) and applies a configurable chain of transformations: corruption (noise injection), filtering (smoothing or recording), and delay (simulated measurement latency). Each observable also has its own sampling rate, independent of the simulation timestep.

The sensor decorator assigns a modality attribute (e.g., "proprio", "object", "image") to a sensor function. Sensor functions must accept an obs_cache dictionary and return scalar or array values, handling the empty-dict case for initialization. The module provides factory functions for creating corrupters (create_deterministic_corrupter, create_uniform_noise_corrupter, create_gaussian_noise_corrupter) and delayers (create_deterministic_delayer, create_uniform_sampled_delayer, create_gaussian_sampled_delayer). Default no-ops are provided as NO_CORRUPTION, NO_FILTER, and NO_DELAY.

The Observable class manages timing: on each update(timestep, obs_cache) call, it increments an internal timer and samples the sensor at the configured rate, optionally applying the configured delay before reading. The obs property returns the most recent observation if the observable is active, or None otherwise. Observables can be enabled/disabled (controlling computation) and activated/deactivated (controlling visibility) independently.

Usage

Use this module to create realistic sensor models in robosuite environments. Wrap MuJoCo state queries with the @sensor decorator, create Observable instances with desired noise/delay/filter parameters, and call update each simulation timestep. Access current values via the obs property.

Code Reference

Source Location

Signature

def sensor(modality) -> function: ...

def create_deterministic_corrupter(corruption, low=-np.inf, high=np.inf) -> function: ...

def create_uniform_noise_corrupter(min_noise, max_noise, low=-np.inf, high=np.inf) -> function: ...

def create_gaussian_noise_corrupter(mean, std, low=-np.inf, high=np.inf) -> function: ...

def create_deterministic_delayer(delay) -> function: ...

def create_uniform_sampled_delayer(min_delay, max_delay) -> function: ...

def create_gaussian_sampled_delayer(mean, std) -> function: ...

class Observable:
    def __init__(self, name, sensor, corrupter=None, filter=None,
                 delayer=None, sampling_rate=20, enabled=True, active=True): ...
    def update(self, timestep, obs_cache, force=False): ...
    def reset(self): ...
    def is_enabled(self) -> bool: ...
    def is_active(self) -> bool: ...
    def set_enabled(self, enabled): ...
    def set_active(self, active): ...
    def set_sensor(self, sensor): ...
    def set_corrupter(self, corrupter): ...
    def set_filter(self, filter): ...
    def set_delayer(self, delayer): ...
    def set_sampling_rate(self, rate): ...
    @property
    def obs(self): ...
    @property
    def modality(self) -> str: ...

Import

from robosuite.utils.observables import (
    sensor,
    Observable,
    create_gaussian_noise_corrupter,
    create_deterministic_delayer,
    NO_CORRUPTION,
    NO_FILTER,
    NO_DELAY,
)

I/O Contract

Inputs

Name Type Required Description
name str Yes Unique name identifying this observable
sensor function (with @sensor decorator) Yes Sensor function that takes obs_cache dict and returns scalar/array
corrupter function or None No Function taking sensor output and returning corrupted version
filter function or None No Function taking corrupted data and returning filtered data
delayer function or None No Function returning delay in seconds (float)
sampling_rate float No Sampling frequency in Hz (default: 20)
enabled bool No Whether observable is computed each step (default: True)
active bool No Whether observable returns values via obs property (default: True)
timestep (update) float Yes Simulation time since last update call in seconds
obs_cache (update) dict Yes Dictionary of pre-computed observable values, updated in-place

Outputs

Name Type Description
obs (property) None or float or np.array Current observation value if active, else None
modality (property) str Sensor modality string (e.g., "proprio", "object")

Usage Examples

import numpy as np
from robosuite.utils.observables import (
    sensor,
    Observable,
    create_gaussian_noise_corrupter,
    create_deterministic_delayer,
)

# Define a custom sensor with the @sensor decorator
@sensor(modality="proprio")
def joint_pos_sensor(obs_cache):
    if not obs_cache:
        return np.zeros(7)
    return obs_cache.get("raw_joint_pos", np.zeros(7))

# Create an observable with noise and delay
obs = Observable(
    name="robot0_joint_pos",
    sensor=joint_pos_sensor,
    corrupter=create_gaussian_noise_corrupter(mean=0.0, std=0.01),
    delayer=create_deterministic_delayer(delay=0.01),
    sampling_rate=100,
    enabled=True,
    active=True,
)

# Simulate update loop
obs_cache = {"raw_joint_pos": np.random.randn(7)}
for _ in range(10):
    obs.update(timestep=0.002, obs_cache=obs_cache)
    current_value = obs.obs
    print(f"Observable value: {current_value}")

# Reset the observable
obs.reset()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment