Implementation:Google deepmind Dm control Obs Buffer
| Knowledge Sources | |
|---|---|
| Domains | Composer, Observables |
| Last Updated | 2026-02-15 04:00 GMT |
Overview
The obs_buffer module implements a time-aware observation buffer that manages the buffering and delaying of observations, supporting configurable buffer sizes and observation delivery delays.
Description
InFlightObservation is a lightweight data class using __slots__ that tracks a single observation's timestamp, delay, arrival time (timestamp + delay), and value. It supports comparison via __lt__ for sorting by arrival time.
Buffer maintains two deques: an _arrived_deque (bounded to buffer_size) holding observations that are due for delivery, and an unbounded _pending_deque for delayed observations awaiting their arrival time. The insert method handles both zero-delay (immediate delivery) and positive-delay observations. For delayed observations, it maintains arrival-time ordering in the pending deque, handling out-of-order arrivals by sorting new entries into the correct position. The read method returns a NumPy array of the currently arrived observations, moving any pending items whose arrival time has passed.
The drop_unobserved_upcoming_items method performs schedule optimization by simulating future buffer states to determine which proposed observations will never actually be delivered (because they become stale before being read). This reduces unnecessary computation during control steps. The buffer supports optional padding with the initial observation value and stripping of singleton buffer dimensions.
Usage
Used internally by the Composer observation system to manage multi-rate observations with configurable delays and buffering. This supports realistic sensor modeling for reinforcement learning environments.
Code Reference
Source Location
- Repository: Google_deepmind_Dm_control
- File: dm_control/composer/observation/obs_buffer.py
- Lines: 1-251
Signature
class InFlightObservation:
__slots__ = ('arrival', 'timestamp', 'delay', 'value')
def __init__(self, timestamp, delay, value):
def __lt__(self, other):
class Buffer:
def __init__(self, buffer_size, shape, dtype,
pad_with_initial_value=False,
strip_singleton_buffer_dim=False):
@property
def shape(self):
@property
def dtype(self):
def insert(self, timestamp, delay, value):
def read(self, current_time):
def drop_unobserved_upcoming_items(self, observation_schedule, read_interval):
Import
from dm_control.composer.observation.obs_buffer import Buffer
from dm_control.composer.observation.obs_buffer import InFlightObservation
I/O Contract
Inputs (Buffer.__init__)
| Name | Type | Required | Description |
|---|---|---|---|
| buffer_size | int | Yes | Maximum number of observations returned by read
|
| shape | int or tuple | Yes | Shape of a single observation entry |
| dtype | numpy.dtype | Yes | NumPy dtype of observation entries |
| pad_with_initial_value | bool | No | If True, pad buffer with first observation value instead of zeros (default: False) |
| strip_singleton_buffer_dim | bool | No | If True and buffer_size == 1, omit the leading buffer dimension (default: False)
|
Inputs (Buffer.insert)
| Name | Type | Required | Description |
|---|---|---|---|
| timestamp | float | Yes | Time at which this observation was made |
| delay | float | Yes | Delay before the observation is delivered (must be non-negative) |
| value | array-like | Yes | The observation value |
Outputs
| Name | Type | Description |
|---|---|---|
| read(current_time) | numpy.ndarray | Array of shape (buffer_size, *shape) containing the most recent arrived observations
|
| shape | tuple | The buffered shape of the output array |
| dtype | numpy.dtype | The data type of the output array |
Usage Examples
import numpy as np
from dm_control.composer.observation.obs_buffer import Buffer
# Create a buffer for 3-dimensional observations with buffer size 2
buf = Buffer(buffer_size=2, shape=(3,), dtype=np.float64)
# Insert observations with varying delays
buf.insert(timestamp=0.0, delay=0, value=[1.0, 2.0, 3.0])
buf.insert(timestamp=0.1, delay=0.05, value=[4.0, 5.0, 6.0])
buf.insert(timestamp=0.2, delay=0, value=[7.0, 8.0, 9.0])
# Read the buffer at the current time
observations = buf.read(current_time=0.2)
# Returns a (2, 3) array with the two most recent arrived observations