Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Google deepmind Dm control Composer Observables

From Leeroopedia
Attribute Value
Implementation Composer Observables
Workflow Composer_Environment_Building
Domain Reinforcement_Learning, Observation
Source dm_control
Last Updated 2026-02-15 00:00 GMT

Overview

Concrete tool for defining, buffering, and delivering multi-rate, delayed, and aggregated observations in dm_control Composer environments through the Observable class hierarchy and the Updater.

Description

The dm_control observation system is split across two modules:

Observable classes (in dm_control.composer.observation.observable) define what to observe and how to configure it:

  • Observable -- abstract base with properties: update_interval, buffer_size, delay, aggregator, corruptor, enabled. Subclasses implement _callable(physics) returning a zero-argument callable that produces the raw observation.
  • Generic -- wraps an arbitrary raw_observation_callable(physics).
  • MujocoFeature -- reads a named field (e.g. "qpos") from physics.named.data for a given feature name.
  • MujocoCamera -- renders an image from a camera identified by name, supporting RGB, depth, and configurable resolution.
  • MJCFFeature -- reads a field from physics.bind(mjcf_element), the preferred MJCF-aware variant. Supports slicing via __getitem__.
  • MJCFCamera -- renders from a camera mjcf.Element, supporting RGB, depth, segmentation, custom scene options, and render flag overrides.

Updater (in dm_control.composer.observation.updater) manages the runtime lifecycle:

  • Updater.__init__(observables, physics_steps_per_control_step, ...) accepts a dict (or list of dicts) of observables.
  • reset(physics, random_state) creates buffers for all enabled observables, samples initial observations, and initializes schedules.
  • prepare_for_next_control_step() pre-computes the update schedule for the upcoming control step, optimizing by dropping entries that will never be read.
  • update() advances the step counter and inserts new observations into buffers when scheduled.
  • get_observation() reads the current buffer contents, applies aggregators, and returns a dict of observation arrays.
  • observation_spec() returns a dict of specs.Array or specs.BoundedArray describing the shape and dtype of each enabled observation.

Default constants: DEFAULT_BUFFER_SIZE = 1, DEFAULT_UPDATE_INTERVAL = 1, DEFAULT_DELAY = 0.

Usage

Use the observable classes inside Observables subclasses of entities (decorated with @define.observable) to declare observation channels. The Updater is normally created and managed internally by the Environment class and does not need to be used directly.

Code Reference

Attribute Value
Source Location (base) dm_control/composer/observation/observable/base.py:L54-309
Source Location (mjcf) dm_control/composer/observation/observable/mjcf.py:L43-276
Source Location (updater) dm_control/composer/observation/updater.py:L120-331
Signature (Observable.__init__) Observable.__init__(self, update_interval, buffer_size, delay, aggregator, corruptor)
Signature (Generic.__init__) Generic.__init__(self, raw_observation_callable, update_interval=1, buffer_size=None, delay=None, aggregator=None, corruptor=None)
Signature (MJCFFeature.__init__) MJCFFeature.__init__(self, kind, mjcf_element, update_interval=1, buffer_size=None, delay=None, aggregator=None, corruptor=None, index=None)
Signature (MJCFCamera.__init__) MJCFCamera.__init__(self, mjcf_element, height=240, width=320, update_interval=1, buffer_size=None, delay=None, aggregator=None, corruptor=None, depth=False, segmentation=False, scene_option=None, render_flag_overrides=None)
Signature (Updater.__init__) Updater.__init__(self, observables, physics_steps_per_control_step=1, strip_singleton_buffer_dim=False, pad_with_initial_value=False)
Import from dm_control.composer.observation.observable import base, mjcf as mjcf_observable, from dm_control.composer.observation import updater

I/O Contract

Inputs (Observable)

Name Type Description
update_interval int Number of physics steps between successive updates (default 1)
buffer_size int or None Maximum number of observations stored in the ring buffer (default 1)
delay int or None Additional physics steps of latency before an observation is visible (default 0)
aggregator str or callable or None Reduction function over the buffer dimension: 'min', 'max', 'mean', 'median', 'sum', or a custom callable
corruptor callable or None Function (observation, random_state=) -> corrupted_observation
enabled bool Whether this observable is active (default False)

Outputs (Updater)

Name Type Description
get_observation() dict of np.ndarray Current observation values for all enabled observables
observation_spec() dict of specs.Array Shape and dtype specifications for enabled observations

Usage Examples

Declaring observables inside an entity

from dm_control.composer import define, entity
from dm_control.composer.observation.observable import mjcf as mjcf_obs


class MyRobot(entity.Entity):
    def _build(self):
        self._model = mjcf.from_path('robot.xml')

    @property
    def mjcf_model(self):
        return self._model

    def _build_observables(self):
        return MyRobotObservables(self)


class MyRobotObservables(entity.Observables):

    @define.observable
    def joint_positions(self):
        return mjcf_obs.MJCFFeature('qpos', self._entity.mjcf_model.find_all('joint'))

    @define.observable
    def joint_velocities(self):
        return mjcf_obs.MJCFFeature('qvel', self._entity.mjcf_model.find_all('joint'))

    @define.observable
    def camera(self):
        cam_elem = self._entity.mjcf_model.find('camera', 'egocentric')
        return mjcf_obs.MJCFCamera(cam_elem, height=64, width=64)

Configuring observable options

import numpy as np

# Enable with multi-rate sensing and noise
robot.observables.joint_positions.enabled = True
robot.observables.joint_positions.update_interval = 1
robot.observables.joint_positions.delay = 2
robot.observables.joint_positions.corruptor = (
    lambda obs, random_state: obs + random_state.normal(scale=0.01, size=obs.shape))

# Camera at lower rate with mean aggregation
robot.observables.camera.enabled = True
robot.observables.camera.update_interval = 5
robot.observables.camera.buffer_size = 3
robot.observables.camera.aggregator = 'mean'

Using a Generic observable for task-specific quantities

from dm_control.composer.observation.observable import base


def distance_to_target(physics):
    hand = physics.named.data.xpos['hand']
    target = physics.named.data.xpos['target']
    return np.linalg.norm(hand - target)


dist_obs = base.Generic(distance_to_target)
dist_obs.enabled = True

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment