Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Google deepmind Dm control Composer Task

From Leeroopedia
Attribute Value
Implementation Composer Task
Workflow Composer_Environment_Building
Domain Reinforcement_Learning, Composition
Source dm_control
Last Updated 2026-02-15 00:00 GMT

Overview

Concrete tool for specifying the reward function, termination logic, timestep configuration, and action mapping in a dm_control Composer environment through the abstract Task class and the convenience NullTask.

Description

The Task class in dm_control.composer.task is the abstract base class that every Composer task must subclass. It requires implementing:

  • root_entity -- an abstract property returning the top-level Entity (typically an Arena) that roots the model hierarchy.
  • get_reward(physics) -- an abstract method returning a scalar (or structured) reward from the current physics state.

The class provides a rich set of optional overrides and properties:

  • Termination: should_terminate_episode(physics) returns False by default; override to define success or failure conditions.
  • Discount: get_discount(physics) returns 1.0 by default; override for variable discount factors.
  • Timesteps: control_timestep, physics_timestep, and set_timesteps(control_timestep, physics_timestep) manage the ratio between agent control steps and physics simulation steps, with automatic divisibility checking.
  • Action mapping: before_step(physics, action, random_state) defaults to calling physics.set_control(action); override to implement custom action mappings.
  • Episode initialization: initialize_episode_mjcf(random_state) for pre-compile model changes and initialize_episode(physics, random_state) for post-compile state setup.
  • Observables: The observables property automatically merges all entity observables with any task-specific observables from task_observables.
  • Action spec: action_spec(physics) returns a BoundedArray matching the physics actuators.

NullTask is a minimal concrete implementation wrapping a single entity with a zero reward, useful for testing or visualization.

Usage

Subclass Task to create any Composer reinforcement learning task. Set root_entity to an arena containing your entities, implement get_reward, and optionally override the other lifecycle methods. Use NullTask for quick entity testing without a reward.

Code Reference

Attribute Value
Source Location dm_control/composer/task.py:L36-322
Signature (root_entity) @abc.abstractproperty Task.root_entity
Signature (get_reward) Task.get_reward(self, physics) -> float (abstract)
Signature (should_terminate_episode) Task.should_terminate_episode(self, physics) -> bool
Signature (initialize_episode) Task.initialize_episode(self, physics, random_state) -> None
Signature (initialize_episode_mjcf) Task.initialize_episode_mjcf(self, random_state) -> None
Signature (before_step) Task.before_step(self, physics, action, random_state) -> None
Signature (set_timesteps) Task.set_timesteps(self, control_timestep, physics_timestep) -> None
Signature (action_spec) Task.action_spec(self, physics) -> specs.BoundedArray
Signature (NullTask) NullTask.__init__(self, root_entity)
Import from dm_control.composer import task

I/O Contract

Inputs

Name Type Description
physics mjcf.Physics The compiled MuJoCo physics instance
action np.ndarray Agent action vector matching action_spec
random_state np.random.RandomState Seeded random number generator for reproducibility

Outputs

Name Type Description
root_entity Entity The root entity (arena) for this task
get_reward return float Scalar reward for the current step
should_terminate_episode return bool Whether the episode should end
get_discount return float Discount factor for the current step (default 1.0)
observables OrderedDict Merged dict of entity and task observables
action_spec return specs.BoundedArray Specification of valid actions
control_timestep float Seconds between agent actions
physics_timestep float Seconds between MuJoCo simulation steps
physics_steps_per_control_step int Number of physics steps per control step

Usage Examples

Simple reaching task

import numpy as np
from dm_control.composer import arena as arena_module
from dm_control.composer import task as task_module


class ReachTask(task_module.Task):
    def __init__(self, robot, target_entity):
        self._arena = arena_module.Arena()
        self._arena.attach(robot)
        self._target_frame = self._arena.add_free_entity(target_entity)
        self._robot = robot
        self._target = target_entity

        # Enable relevant observables
        robot.observables.joint_positions.enabled = True
        target_entity.observables.position.enabled = True

        # Set timesteps: 50 Hz control, 200 Hz physics
        self.set_timesteps(
            control_timestep=0.02,
            physics_timestep=0.005)

    @property
    def root_entity(self):
        return self._arena

    def initialize_episode(self, physics, random_state):
        # Randomize target position
        self._target.set_pose(
            physics,
            position=random_state.uniform(-0.3, 0.3, size=3))

    def get_reward(self, physics):
        robot_pos = physics.bind(self._robot.grip_site).xpos
        target_pos = physics.bind(self._target.mjcf_model.find(
            'body', 'target_body')).xpos
        distance = np.linalg.norm(robot_pos - target_pos)
        return -distance

    def should_terminate_episode(self, physics):
        return False

Using NullTask for visualization

from dm_control.composer import task as task_module
from dm_control.composer import arena as arena_module

arena = arena_module.Arena()
null_task = task_module.NullTask(root_entity=arena)
# null_task.get_reward(physics) always returns 0.0

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment