Implementation:Google deepmind Dm control Composer Task
| Attribute | Value |
|---|---|
| Implementation | Composer Task |
| Workflow | Composer_Environment_Building |
| Domain | Reinforcement_Learning, Composition |
| Source | dm_control |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
Concrete tool for specifying the reward function, termination logic, timestep configuration, and action mapping in a dm_control Composer environment through the abstract Task class and the convenience NullTask.
Description
The Task class in dm_control.composer.task is the abstract base class that every Composer task must subclass. It requires implementing:
root_entity-- an abstract property returning the top-levelEntity(typically an Arena) that roots the model hierarchy.get_reward(physics)-- an abstract method returning a scalar (or structured) reward from the current physics state.
The class provides a rich set of optional overrides and properties:
- Termination:
should_terminate_episode(physics)returnsFalseby default; override to define success or failure conditions. - Discount:
get_discount(physics)returns1.0by default; override for variable discount factors. - Timesteps:
control_timestep,physics_timestep, andset_timesteps(control_timestep, physics_timestep)manage the ratio between agent control steps and physics simulation steps, with automatic divisibility checking. - Action mapping:
before_step(physics, action, random_state)defaults to callingphysics.set_control(action); override to implement custom action mappings. - Episode initialization:
initialize_episode_mjcf(random_state)for pre-compile model changes andinitialize_episode(physics, random_state)for post-compile state setup. - Observables: The
observablesproperty automatically merges all entity observables with any task-specific observables fromtask_observables. - Action spec:
action_spec(physics)returns aBoundedArraymatching the physics actuators.
NullTask is a minimal concrete implementation wrapping a single entity with a zero reward, useful for testing or visualization.
Usage
Subclass Task to create any Composer reinforcement learning task. Set root_entity to an arena containing your entities, implement get_reward, and optionally override the other lifecycle methods. Use NullTask for quick entity testing without a reward.
Code Reference
| Attribute | Value |
|---|---|
| Source Location | dm_control/composer/task.py:L36-322
|
| Signature (root_entity) | @abc.abstractproperty Task.root_entity
|
| Signature (get_reward) | Task.get_reward(self, physics) -> float (abstract)
|
| Signature (should_terminate_episode) | Task.should_terminate_episode(self, physics) -> bool
|
| Signature (initialize_episode) | Task.initialize_episode(self, physics, random_state) -> None
|
| Signature (initialize_episode_mjcf) | Task.initialize_episode_mjcf(self, random_state) -> None
|
| Signature (before_step) | Task.before_step(self, physics, action, random_state) -> None
|
| Signature (set_timesteps) | Task.set_timesteps(self, control_timestep, physics_timestep) -> None
|
| Signature (action_spec) | Task.action_spec(self, physics) -> specs.BoundedArray
|
| Signature (NullTask) | NullTask.__init__(self, root_entity)
|
| Import | from dm_control.composer import task
|
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
physics |
mjcf.Physics |
The compiled MuJoCo physics instance |
action |
np.ndarray |
Agent action vector matching action_spec
|
random_state |
np.random.RandomState |
Seeded random number generator for reproducibility |
Outputs
| Name | Type | Description |
|---|---|---|
root_entity |
Entity |
The root entity (arena) for this task |
get_reward return |
float | Scalar reward for the current step |
should_terminate_episode return |
bool | Whether the episode should end |
get_discount return |
float | Discount factor for the current step (default 1.0)
|
observables |
OrderedDict |
Merged dict of entity and task observables |
action_spec return |
specs.BoundedArray |
Specification of valid actions |
control_timestep |
float | Seconds between agent actions |
physics_timestep |
float | Seconds between MuJoCo simulation steps |
physics_steps_per_control_step |
int | Number of physics steps per control step |
Usage Examples
Simple reaching task
import numpy as np
from dm_control.composer import arena as arena_module
from dm_control.composer import task as task_module
class ReachTask(task_module.Task):
def __init__(self, robot, target_entity):
self._arena = arena_module.Arena()
self._arena.attach(robot)
self._target_frame = self._arena.add_free_entity(target_entity)
self._robot = robot
self._target = target_entity
# Enable relevant observables
robot.observables.joint_positions.enabled = True
target_entity.observables.position.enabled = True
# Set timesteps: 50 Hz control, 200 Hz physics
self.set_timesteps(
control_timestep=0.02,
physics_timestep=0.005)
@property
def root_entity(self):
return self._arena
def initialize_episode(self, physics, random_state):
# Randomize target position
self._target.set_pose(
physics,
position=random_state.uniform(-0.3, 0.3, size=3))
def get_reward(self, physics):
robot_pos = physics.bind(self._robot.grip_site).xpos
target_pos = physics.bind(self._target.mjcf_model.find(
'body', 'target_body')).xpos
distance = np.linalg.norm(robot_pos - target_pos)
return -distance
def should_terminate_episode(self, physics):
return False
Using NullTask for visualization
from dm_control.composer import task as task_module
from dm_control.composer import arena as arena_module
arena = arena_module.Arena()
null_task = task_module.NullTask(root_entity=arena)
# null_task.get_reward(physics) always returns 0.0