Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Google deepmind Dm control Obs Buffer

From Leeroopedia
Knowledge Sources
Domains Composer, Observables
Last Updated 2026-02-15 04:00 GMT

Overview

The obs_buffer module implements a time-aware observation buffer that manages the buffering and delaying of observations, supporting configurable buffer sizes and observation delivery delays.

Description

InFlightObservation is a lightweight data class using __slots__ that tracks a single observation's timestamp, delay, arrival time (timestamp + delay), and value. It supports comparison via __lt__ for sorting by arrival time.

Buffer maintains two deques: an _arrived_deque (bounded to buffer_size) holding observations that are due for delivery, and an unbounded _pending_deque for delayed observations awaiting their arrival time. The insert method handles both zero-delay (immediate delivery) and positive-delay observations. For delayed observations, it maintains arrival-time ordering in the pending deque, handling out-of-order arrivals by sorting new entries into the correct position. The read method returns a NumPy array of the currently arrived observations, moving any pending items whose arrival time has passed.

The drop_unobserved_upcoming_items method performs schedule optimization by simulating future buffer states to determine which proposed observations will never actually be delivered (because they become stale before being read). This reduces unnecessary computation during control steps. The buffer supports optional padding with the initial observation value and stripping of singleton buffer dimensions.

Usage

Used internally by the Composer observation system to manage multi-rate observations with configurable delays and buffering. This supports realistic sensor modeling for reinforcement learning environments.

Code Reference

Source Location

Signature

class InFlightObservation:
    __slots__ = ('arrival', 'timestamp', 'delay', 'value')
    def __init__(self, timestamp, delay, value):
    def __lt__(self, other):

class Buffer:
    def __init__(self, buffer_size, shape, dtype,
                 pad_with_initial_value=False,
                 strip_singleton_buffer_dim=False):
    @property
    def shape(self):
    @property
    def dtype(self):
    def insert(self, timestamp, delay, value):
    def read(self, current_time):
    def drop_unobserved_upcoming_items(self, observation_schedule, read_interval):

Import

from dm_control.composer.observation.obs_buffer import Buffer
from dm_control.composer.observation.obs_buffer import InFlightObservation

I/O Contract

Inputs (Buffer.__init__)

Name Type Required Description
buffer_size int Yes Maximum number of observations returned by read
shape int or tuple Yes Shape of a single observation entry
dtype numpy.dtype Yes NumPy dtype of observation entries
pad_with_initial_value bool No If True, pad buffer with first observation value instead of zeros (default: False)
strip_singleton_buffer_dim bool No If True and buffer_size == 1, omit the leading buffer dimension (default: False)

Inputs (Buffer.insert)

Name Type Required Description
timestamp float Yes Time at which this observation was made
delay float Yes Delay before the observation is delivered (must be non-negative)
value array-like Yes The observation value

Outputs

Name Type Description
read(current_time) numpy.ndarray Array of shape (buffer_size, *shape) containing the most recent arrived observations
shape tuple The buffered shape of the output array
dtype numpy.dtype The data type of the output array

Usage Examples

import numpy as np
from dm_control.composer.observation.obs_buffer import Buffer

# Create a buffer for 3-dimensional observations with buffer size 2
buf = Buffer(buffer_size=2, shape=(3,), dtype=np.float64)

# Insert observations with varying delays
buf.insert(timestamp=0.0, delay=0, value=[1.0, 2.0, 3.0])
buf.insert(timestamp=0.1, delay=0.05, value=[4.0, 5.0, 6.0])
buf.insert(timestamp=0.2, delay=0, value=[7.0, 8.0, 9.0])

# Read the buffer at the current time
observations = buf.read(current_time=0.2)
# Returns a (2, 3) array with the two most recent arrived observations

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment