Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Danijar Dreamerv3 Parallel Learner

From Leeroopedia
Knowledge Sources
Domains Reinforcement_Learning, Distributed_Systems, Model_Based_RL
Last Updated 2026-02-15 09:00 GMT

Overview

Concrete tool for running the distributed learner that fetches batches from a remote replay server and trains the shared DreamerV3 agent provided by the parallel module.

Description

The parallel_learner() function in embodied/run/parallel.py implements the continuous training loop. It creates portal.Client connections to the replay and logger servers, sets up prefetched data streams via parallel_stream() (which calls remote sample_batch RPCs with lookahead), restores the checkpoint, signals the actor via barrier, then runs an infinite loop calling agent.train() and agent.report().

Replay context updates (RSSM carry states) are sent back to the replay server via an updater client for improved sampling quality.

Usage

Spawned as a thread within parallel_agent() alongside the actor thread. Shares the agent object — parameter updates are immediately visible to the actor.

Code Reference

Source Location

  • Repository: dreamerv3
  • File: embodied/run/parallel.py
  • Lines: L130-218

Signature

def parallel_learner(agent, barrier, args):
    """
    Run the distributed learner training loop.

    Args:
        agent: Agent - Shared agent object (also used by actor thread).
        barrier: threading.Barrier - Sync with actor for checkpoint restoration.
        args: elements.Config with batch_size, batch_length, replay_addr,
              logger_addr, report_every, save_every, log_every, report_batches,
              from_checkpoint, from_checkpoint_regex.
    """

Import

from embodied.run.parallel import parallel_learner

I/O Contract

Inputs

Name Type Required Description
agent Agent Yes Shared agent with train(), report(), stream() methods
barrier threading.Barrier Yes Sync point with actor thread
args elements.Config Yes batch_size, batch_length, replay_addr, logger_addr, report_every, save_every, log_every

Outputs

Name Type Description
Updated parameters Side effect Agent parameters updated via optimizer (visible to actor thread)
Replay updates RPC calls RSSM carry state updates sent to replay server
Metrics RPC calls Training and report metrics sent to logger server
Checkpoints Files Agent state saved periodically to logdir/ckpt/agent

Usage Examples

# Spawned internally by parallel_agent:
import threading
import portal

agent = make_agent()
barrier = threading.Barrier(2)
portal.Thread(parallel_learner, agent, barrier, args).start()

Related Pages

Implements Principle

Requires Environment

Uses Heuristics

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment