Implementation:Google deepmind Dm control Composer Observables
| Attribute | Value |
|---|---|
| Implementation | Composer Observables |
| Workflow | Composer_Environment_Building |
| Domain | Reinforcement_Learning, Observation |
| Source | dm_control |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
Concrete tool for defining, buffering, and delivering multi-rate, delayed, and aggregated observations in dm_control Composer environments through the Observable class hierarchy and the Updater.
Description
The dm_control observation system is split across two modules:
Observable classes (in dm_control.composer.observation.observable) define what to observe and how to configure it:
Observable-- abstract base with properties:update_interval,buffer_size,delay,aggregator,corruptor,enabled. Subclasses implement_callable(physics)returning a zero-argument callable that produces the raw observation.Generic-- wraps an arbitraryraw_observation_callable(physics).MujocoFeature-- reads a named field (e.g."qpos") fromphysics.named.datafor a given feature name.MujocoCamera-- renders an image from a camera identified by name, supporting RGB, depth, and configurable resolution.MJCFFeature-- reads a field fromphysics.bind(mjcf_element), the preferred MJCF-aware variant. Supports slicing via__getitem__.MJCFCamera-- renders from a cameramjcf.Element, supporting RGB, depth, segmentation, custom scene options, and render flag overrides.
Updater (in dm_control.composer.observation.updater) manages the runtime lifecycle:
Updater.__init__(observables, physics_steps_per_control_step, ...)accepts a dict (or list of dicts) of observables.reset(physics, random_state)creates buffers for all enabled observables, samples initial observations, and initializes schedules.prepare_for_next_control_step()pre-computes the update schedule for the upcoming control step, optimizing by dropping entries that will never be read.update()advances the step counter and inserts new observations into buffers when scheduled.get_observation()reads the current buffer contents, applies aggregators, and returns a dict of observation arrays.observation_spec()returns a dict ofspecs.Arrayorspecs.BoundedArraydescribing the shape and dtype of each enabled observation.
Default constants: DEFAULT_BUFFER_SIZE = 1, DEFAULT_UPDATE_INTERVAL = 1, DEFAULT_DELAY = 0.
Usage
Use the observable classes inside Observables subclasses of entities (decorated with @define.observable) to declare observation channels. The Updater is normally created and managed internally by the Environment class and does not need to be used directly.
Code Reference
| Attribute | Value |
|---|---|
| Source Location (base) | dm_control/composer/observation/observable/base.py:L54-309
|
| Source Location (mjcf) | dm_control/composer/observation/observable/mjcf.py:L43-276
|
| Source Location (updater) | dm_control/composer/observation/updater.py:L120-331
|
| Signature (Observable.__init__) | Observable.__init__(self, update_interval, buffer_size, delay, aggregator, corruptor)
|
| Signature (Generic.__init__) | Generic.__init__(self, raw_observation_callable, update_interval=1, buffer_size=None, delay=None, aggregator=None, corruptor=None)
|
| Signature (MJCFFeature.__init__) | MJCFFeature.__init__(self, kind, mjcf_element, update_interval=1, buffer_size=None, delay=None, aggregator=None, corruptor=None, index=None)
|
| Signature (MJCFCamera.__init__) | MJCFCamera.__init__(self, mjcf_element, height=240, width=320, update_interval=1, buffer_size=None, delay=None, aggregator=None, corruptor=None, depth=False, segmentation=False, scene_option=None, render_flag_overrides=None)
|
| Signature (Updater.__init__) | Updater.__init__(self, observables, physics_steps_per_control_step=1, strip_singleton_buffer_dim=False, pad_with_initial_value=False)
|
| Import | from dm_control.composer.observation.observable import base, mjcf as mjcf_observable, from dm_control.composer.observation import updater
|
I/O Contract
Inputs (Observable)
| Name | Type | Description |
|---|---|---|
update_interval |
int | Number of physics steps between successive updates (default 1) |
buffer_size |
int or None | Maximum number of observations stored in the ring buffer (default 1) |
delay |
int or None | Additional physics steps of latency before an observation is visible (default 0) |
aggregator |
str or callable or None | Reduction function over the buffer dimension: 'min', 'max', 'mean', 'median', 'sum', or a custom callable
|
corruptor |
callable or None | Function (observation, random_state=) -> corrupted_observation
|
enabled |
bool | Whether this observable is active (default False)
|
Outputs (Updater)
| Name | Type | Description |
|---|---|---|
get_observation() |
dict of np.ndarray |
Current observation values for all enabled observables |
observation_spec() |
dict of specs.Array |
Shape and dtype specifications for enabled observations |
Usage Examples
Declaring observables inside an entity
from dm_control.composer import define, entity
from dm_control.composer.observation.observable import mjcf as mjcf_obs
class MyRobot(entity.Entity):
def _build(self):
self._model = mjcf.from_path('robot.xml')
@property
def mjcf_model(self):
return self._model
def _build_observables(self):
return MyRobotObservables(self)
class MyRobotObservables(entity.Observables):
@define.observable
def joint_positions(self):
return mjcf_obs.MJCFFeature('qpos', self._entity.mjcf_model.find_all('joint'))
@define.observable
def joint_velocities(self):
return mjcf_obs.MJCFFeature('qvel', self._entity.mjcf_model.find_all('joint'))
@define.observable
def camera(self):
cam_elem = self._entity.mjcf_model.find('camera', 'egocentric')
return mjcf_obs.MJCFCamera(cam_elem, height=64, width=64)
Configuring observable options
import numpy as np
# Enable with multi-rate sensing and noise
robot.observables.joint_positions.enabled = True
robot.observables.joint_positions.update_interval = 1
robot.observables.joint_positions.delay = 2
robot.observables.joint_positions.corruptor = (
lambda obs, random_state: obs + random_state.normal(scale=0.01, size=obs.shape))
# Camera at lower rate with mean aggregation
robot.observables.camera.enabled = True
robot.observables.camera.update_interval = 5
robot.observables.camera.buffer_size = 3
robot.observables.camera.aggregator = 'mean'
Using a Generic observable for task-specific quantities
from dm_control.composer.observation.observable import base
def distance_to_target(physics):
hand = physics.named.data.xpos['hand']
target = physics.named.data.xpos['target']
return np.linalg.norm(hand - target)
dist_obs = base.Generic(distance_to_target)
dist_obs.enabled = True