Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ARISE Initiative Robosuite Buffers

From Leeroopedia
Knowledge Sources
Domains Robotics, Signal Processing
Last Updated 2026-02-15 07:00 GMT

Overview

The buffers module provides three buffer data structures -- RingBuffer, DeltaBuffer, and DelayBuffer -- used for temporal data storage, averaging, delta computation, and simulated measurement delay in robosuite controllers and sensor pipelines.

Description

This module implements lightweight buffer abstractions used throughout robosuite for temporal signal processing.

Buffer is the abstract base class defining a minimal interface with push and clear methods.

RingBuffer implements a fixed-length circular buffer backed by a 2D numpy array of shape (length, dim). New values are pushed via a modular pointer, overwriting the oldest entry when full. It provides a current property returning the most recent pushed value and an average property that computes the mean of all valid entries (accounting for partially-filled buffers). This is commonly used for smoothing the derivative (D) component in PID controllers.

DeltaBuffer is a simple two-element buffer that tracks the current and last pushed values. Its delta property returns the difference current - last, and its average property returns the midpoint. This is used for computing velocity estimates from position readings and for tracking changes in end-effector state between controller timesteps.

DelayBuffer extends RingBuffer with a get_delayed_value method that retrieves a value from a specified number of steps in the past, enabling simulation of sensor measurement delays. The delay must be less than the buffer length.

Usage

Use RingBuffer for moving-average filtering of control signals (e.g., smoothing joint velocity estimates). Use DeltaBuffer for tracking change between consecutive timesteps (e.g., computing end-effector velocity from position). Use DelayBuffer for simulating sensor latency in the observable pipeline.

Code Reference

Source Location

Signature

class Buffer(object):
    def push(self, value): ...
    def clear(self): ...

class RingBuffer(Buffer):
    def __init__(self, dim: int, length: int): ...
    def push(self, value): ...
    def clear(self): ...
    @property
    def current(self) -> np.array: ...
    @property
    def average(self) -> np.array: ...

class DeltaBuffer(Buffer):
    def __init__(self, dim: int, init_value=None): ...
    def push(self, value): ...
    def clear(self): ...
    @property
    def delta(self) -> np.array: ...
    @property
    def average(self) -> np.array: ...

class DelayBuffer(RingBuffer):
    def get_delayed_value(self, delay: int) -> np.array: ...

Import

from robosuite.utils.buffers import RingBuffer, DeltaBuffer, DelayBuffer

I/O Contract

Inputs

Name Type Required Description
dim int Yes Dimensionality of each buffer entry (e.g., size of a state vector)
length (RingBuffer, DelayBuffer) int Yes Maximum number of entries the buffer can hold
init_value (DeltaBuffer) None or Iterable No Initial value for the "last" entry (default: zeros)
value (push) int, float, or array Yes Value to push into the buffer as a single entry
delay (get_delayed_value) int Yes Number of steps behind most recent value to retrieve

Outputs

Name Type Description
current (RingBuffer) np.array Most recently pushed value
average (RingBuffer) np.array Mean of all valid entries in the buffer
delta (DeltaBuffer) np.array Difference: current - last value
average (DeltaBuffer) np.array Mean of current and last value
get_delayed_value (DelayBuffer) np.array Value from @delay steps ago

Usage Examples

import numpy as np
from robosuite.utils.buffers import RingBuffer, DeltaBuffer, DelayBuffer

# RingBuffer: smooth joint velocity readings
vel_buffer = RingBuffer(dim=7, length=5)
for _ in range(10):
    vel_reading = np.random.randn(7) * 0.1
    vel_buffer.push(vel_reading)
smoothed_vel = vel_buffer.average
print(f"Smoothed velocity: {smoothed_vel}")
print(f"Latest velocity: {vel_buffer.current}")

# DeltaBuffer: compute end-effector position change
pos_buffer = DeltaBuffer(dim=3)
pos_buffer.push(np.array([0.5, 0.0, 0.8]))
pos_buffer.push(np.array([0.52, 0.01, 0.79]))
velocity_estimate = pos_buffer.delta  # [0.02, 0.01, -0.01]
midpoint = pos_buffer.average        # [0.51, 0.005, 0.795]

# DelayBuffer: simulate a 3-step sensor delay
delay_buf = DelayBuffer(dim=3, length=10)
for i in range(10):
    delay_buf.push(np.array([i, i, i]))
delayed_value = delay_buf.get_delayed_value(delay=3)
print(f"Value from 3 steps ago: {delayed_value}")

# Clear all buffers
vel_buffer.clear()
pos_buffer.clear()
delay_buf.clear()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment