Implementation:Danijar Dreamerv3 Parallel Learner
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement_Learning, Distributed_Systems, Model_Based_RL |
| Last Updated | 2026-02-15 09:00 GMT |
Overview
Concrete tool for running the distributed learner that fetches batches from a remote replay server and trains the shared DreamerV3 agent provided by the parallel module.
Description
The parallel_learner() function in embodied/run/parallel.py implements the continuous training loop. It creates portal.Client connections to the replay and logger servers, sets up prefetched data streams via parallel_stream() (which calls remote sample_batch RPCs with lookahead), restores the checkpoint, signals the actor via barrier, then runs an infinite loop calling agent.train() and agent.report().
Replay context updates (RSSM carry states) are sent back to the replay server via an updater client for improved sampling quality.
Usage
Spawned as a thread within parallel_agent() alongside the actor thread. Shares the agent object — parameter updates are immediately visible to the actor.
Code Reference
Source Location
- Repository: dreamerv3
- File: embodied/run/parallel.py
- Lines: L130-218
Signature
def parallel_learner(agent, barrier, args):
"""
Run the distributed learner training loop.
Args:
agent: Agent - Shared agent object (also used by actor thread).
barrier: threading.Barrier - Sync with actor for checkpoint restoration.
args: elements.Config with batch_size, batch_length, replay_addr,
logger_addr, report_every, save_every, log_every, report_batches,
from_checkpoint, from_checkpoint_regex.
"""
Import
from embodied.run.parallel import parallel_learner
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| agent | Agent | Yes | Shared agent with train(), report(), stream() methods |
| barrier | threading.Barrier | Yes | Sync point with actor thread |
| args | elements.Config | Yes | batch_size, batch_length, replay_addr, logger_addr, report_every, save_every, log_every |
Outputs
| Name | Type | Description |
|---|---|---|
| Updated parameters | Side effect | Agent parameters updated via optimizer (visible to actor thread) |
| Replay updates | RPC calls | RSSM carry state updates sent to replay server |
| Metrics | RPC calls | Training and report metrics sent to logger server |
| Checkpoints | Files | Agent state saved periodically to logdir/ckpt/agent |
Usage Examples
# Spawned internally by parallel_agent:
import threading
import portal
agent = make_agent()
barrier = threading.Barrier(2)
portal.Thread(parallel_learner, agent, barrier, args).start()