Implementation:ARISE Initiative Robosuite Buffers
| Knowledge Sources | |
|---|---|
| Domains | Robotics, Signal Processing |
| Last Updated | 2026-02-15 07:00 GMT |
Overview
The buffers module provides three buffer data structures -- RingBuffer, DeltaBuffer, and DelayBuffer -- used for temporal data storage, averaging, delta computation, and simulated measurement delay in robosuite controllers and sensor pipelines.
Description
This module implements lightweight buffer abstractions used throughout robosuite for temporal signal processing.
Buffer is the abstract base class defining a minimal interface with push and clear methods.
RingBuffer implements a fixed-length circular buffer backed by a 2D numpy array of shape (length, dim). New values are pushed via a modular pointer, overwriting the oldest entry when full. It provides a current property returning the most recent pushed value and an average property that computes the mean of all valid entries (accounting for partially-filled buffers). This is commonly used for smoothing the derivative (D) component in PID controllers.
DeltaBuffer is a simple two-element buffer that tracks the current and last pushed values. Its delta property returns the difference current - last, and its average property returns the midpoint. This is used for computing velocity estimates from position readings and for tracking changes in end-effector state between controller timesteps.
DelayBuffer extends RingBuffer with a get_delayed_value method that retrieves a value from a specified number of steps in the past, enabling simulation of sensor measurement delays. The delay must be less than the buffer length.
Usage
Use RingBuffer for moving-average filtering of control signals (e.g., smoothing joint velocity estimates). Use DeltaBuffer for tracking change between consecutive timesteps (e.g., computing end-effector velocity from position). Use DelayBuffer for simulating sensor latency in the observable pipeline.
Code Reference
Source Location
- Repository: ARISE_Initiative_Robosuite
- File: robosuite/utils/buffers.py
Signature
class Buffer(object):
def push(self, value): ...
def clear(self): ...
class RingBuffer(Buffer):
def __init__(self, dim: int, length: int): ...
def push(self, value): ...
def clear(self): ...
@property
def current(self) -> np.array: ...
@property
def average(self) -> np.array: ...
class DeltaBuffer(Buffer):
def __init__(self, dim: int, init_value=None): ...
def push(self, value): ...
def clear(self): ...
@property
def delta(self) -> np.array: ...
@property
def average(self) -> np.array: ...
class DelayBuffer(RingBuffer):
def get_delayed_value(self, delay: int) -> np.array: ...
Import
from robosuite.utils.buffers import RingBuffer, DeltaBuffer, DelayBuffer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| dim | int | Yes | Dimensionality of each buffer entry (e.g., size of a state vector) |
| length (RingBuffer, DelayBuffer) | int | Yes | Maximum number of entries the buffer can hold |
| init_value (DeltaBuffer) | None or Iterable | No | Initial value for the "last" entry (default: zeros) |
| value (push) | int, float, or array | Yes | Value to push into the buffer as a single entry |
| delay (get_delayed_value) | int | Yes | Number of steps behind most recent value to retrieve |
Outputs
| Name | Type | Description |
|---|---|---|
| current (RingBuffer) | np.array | Most recently pushed value |
| average (RingBuffer) | np.array | Mean of all valid entries in the buffer |
| delta (DeltaBuffer) | np.array | Difference: current - last value |
| average (DeltaBuffer) | np.array | Mean of current and last value |
| get_delayed_value (DelayBuffer) | np.array | Value from @delay steps ago |
Usage Examples
import numpy as np
from robosuite.utils.buffers import RingBuffer, DeltaBuffer, DelayBuffer
# RingBuffer: smooth joint velocity readings
vel_buffer = RingBuffer(dim=7, length=5)
for _ in range(10):
vel_reading = np.random.randn(7) * 0.1
vel_buffer.push(vel_reading)
smoothed_vel = vel_buffer.average
print(f"Smoothed velocity: {smoothed_vel}")
print(f"Latest velocity: {vel_buffer.current}")
# DeltaBuffer: compute end-effector position change
pos_buffer = DeltaBuffer(dim=3)
pos_buffer.push(np.array([0.5, 0.0, 0.8]))
pos_buffer.push(np.array([0.52, 0.01, 0.79]))
velocity_estimate = pos_buffer.delta # [0.02, 0.01, -0.01]
midpoint = pos_buffer.average # [0.51, 0.005, 0.795]
# DelayBuffer: simulate a 3-step sensor delay
delay_buf = DelayBuffer(dim=3, length=10)
for i in range(10):
delay_buf.push(np.array([i, i, i]))
delayed_value = delay_buf.get_delayed_value(delay=3)
print(f"Value from 3 steps ago: {delayed_value}")
# Clear all buffers
vel_buffer.clear()
pos_buffer.clear()
delay_buf.clear()