Implementation:ARISE Initiative Robosuite Observables
| Knowledge Sources | |
|---|---|
| Domains | Robotics, Sensor Modeling |
| Last Updated | 2026-02-15 07:00 GMT |
Overview
The observables module defines the Observable class and associated decorator/factory functions for modeling realistic sensor pipelines with configurable corruption, filtering, delay, and sampling rate in robosuite simulations.
Description
This module implements a realistic sensor observation pipeline for robotic simulation. Each Observable wraps a sensor function (decorated with @sensor(modality)) and applies a configurable chain of transformations: corruption (noise injection), filtering (smoothing or recording), and delay (simulated measurement latency). Each observable also has its own sampling rate, independent of the simulation timestep.
The sensor decorator assigns a modality attribute (e.g., "proprio", "object", "image") to a sensor function. Sensor functions must accept an obs_cache dictionary and return scalar or array values, handling the empty-dict case for initialization. The module provides factory functions for creating corrupters (create_deterministic_corrupter, create_uniform_noise_corrupter, create_gaussian_noise_corrupter) and delayers (create_deterministic_delayer, create_uniform_sampled_delayer, create_gaussian_sampled_delayer). Default no-ops are provided as NO_CORRUPTION, NO_FILTER, and NO_DELAY.
The Observable class manages timing: on each update(timestep, obs_cache) call, it increments an internal timer and samples the sensor at the configured rate, optionally applying the configured delay before reading. The obs property returns the most recent observation if the observable is active, or None otherwise. Observables can be enabled/disabled (controlling computation) and activated/deactivated (controlling visibility) independently.
Usage
Use this module to create realistic sensor models in robosuite environments. Wrap MuJoCo state queries with the @sensor decorator, create Observable instances with desired noise/delay/filter parameters, and call update each simulation timestep. Access current values via the obs property.
Code Reference
Source Location
- Repository: ARISE_Initiative_Robosuite
- File: robosuite/utils/observables.py
Signature
def sensor(modality) -> function: ...
def create_deterministic_corrupter(corruption, low=-np.inf, high=np.inf) -> function: ...
def create_uniform_noise_corrupter(min_noise, max_noise, low=-np.inf, high=np.inf) -> function: ...
def create_gaussian_noise_corrupter(mean, std, low=-np.inf, high=np.inf) -> function: ...
def create_deterministic_delayer(delay) -> function: ...
def create_uniform_sampled_delayer(min_delay, max_delay) -> function: ...
def create_gaussian_sampled_delayer(mean, std) -> function: ...
class Observable:
def __init__(self, name, sensor, corrupter=None, filter=None,
delayer=None, sampling_rate=20, enabled=True, active=True): ...
def update(self, timestep, obs_cache, force=False): ...
def reset(self): ...
def is_enabled(self) -> bool: ...
def is_active(self) -> bool: ...
def set_enabled(self, enabled): ...
def set_active(self, active): ...
def set_sensor(self, sensor): ...
def set_corrupter(self, corrupter): ...
def set_filter(self, filter): ...
def set_delayer(self, delayer): ...
def set_sampling_rate(self, rate): ...
@property
def obs(self): ...
@property
def modality(self) -> str: ...
Import
from robosuite.utils.observables import (
sensor,
Observable,
create_gaussian_noise_corrupter,
create_deterministic_delayer,
NO_CORRUPTION,
NO_FILTER,
NO_DELAY,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | str | Yes | Unique name identifying this observable |
| sensor | function (with @sensor decorator) | Yes | Sensor function that takes obs_cache dict and returns scalar/array |
| corrupter | function or None | No | Function taking sensor output and returning corrupted version |
| filter | function or None | No | Function taking corrupted data and returning filtered data |
| delayer | function or None | No | Function returning delay in seconds (float) |
| sampling_rate | float | No | Sampling frequency in Hz (default: 20) |
| enabled | bool | No | Whether observable is computed each step (default: True) |
| active | bool | No | Whether observable returns values via obs property (default: True) |
| timestep (update) | float | Yes | Simulation time since last update call in seconds |
| obs_cache (update) | dict | Yes | Dictionary of pre-computed observable values, updated in-place |
Outputs
| Name | Type | Description |
|---|---|---|
| obs (property) | None or float or np.array | Current observation value if active, else None |
| modality (property) | str | Sensor modality string (e.g., "proprio", "object") |
Usage Examples
import numpy as np
from robosuite.utils.observables import (
sensor,
Observable,
create_gaussian_noise_corrupter,
create_deterministic_delayer,
)
# Define a custom sensor with the @sensor decorator
@sensor(modality="proprio")
def joint_pos_sensor(obs_cache):
if not obs_cache:
return np.zeros(7)
return obs_cache.get("raw_joint_pos", np.zeros(7))
# Create an observable with noise and delay
obs = Observable(
name="robot0_joint_pos",
sensor=joint_pos_sensor,
corrupter=create_gaussian_noise_corrupter(mean=0.0, std=0.01),
delayer=create_deterministic_delayer(delay=0.01),
sampling_rate=100,
enabled=True,
active=True,
)
# Simulate update loop
obs_cache = {"raw_joint_pos": np.random.randn(7)}
for _ in range(10):
obs.update(timestep=0.002, obs_cache=obs_cache)
current_value = obs.obs
print(f"Observable value: {current_value}")
# Reset the observable
obs.reset()