Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Danijar Dreamerv3 Combined Launcher

From Leeroopedia
Knowledge Sources
Domains Reinforcement_Learning, Distributed_Systems
Last Updated 2026-02-15 09:00 GMT

Overview

Wrapper for the portal library's process management that serializes factory functions and spawns the distributed DreamerV3 training processes.

Description

The combined() function in embodied/run/parallel.py is the entry point for distributed training. It resolves auto-assigned network addresses for actor, replay, and logger servers, serializes all factory functions via cloudpickle.dumps, creates portal.Process (or portal.Thread) workers for each role, and calls portal.run(workers) which blocks until any process fails.

The parallel_agent() function creates a shared agent object and spawns actor and learner as threads within the same process (sharing GPU memory). A threading.Barrier synchronizes startup so the learner can restore a checkpoint before the actor begins collecting data.

Usage

This is a Wrapper Doc for the external portal library's Process/Thread/run APIs. Called when config.script == 'parallel'.

Code Reference

Source Location

  • Repository: dreamerv3
  • File: embodied/run/parallel.py
  • Lines: L15-70 (combined + parallel_agent)

Signature

def combined(
    make_agent,          # Callable -> Agent
    make_replay_train,   # Callable -> Replay
    make_replay_eval,    # Callable -> Replay
    make_env_train,      # Callable(index) -> Env
    make_env_eval,       # Callable(index) -> Env
    make_stream,         # Callable(replay, mode) -> Stream
    make_logger,         # Callable -> Logger
    args):               # elements.Config
    """
    Launch all distributed training processes.

    Resolves network addresses, serializes factories via cloudpickle,
    spawns agent/replay/logger/env processes, and blocks until completion.
    """

def parallel_agent(make_agent, args):
    """
    Create shared agent and spawn actor + learner threads.

    Args:
        make_agent: bytes (cloudpickle-serialized) or Callable -> Agent
        args: elements.Config with actor_addr, replay_addr, logger_addr, etc.
    """

Import

import embodied
embodied.run.parallel.combined(make_agent, make_replay_train, make_replay_eval,
                                make_env_train, make_env_eval, make_stream,
                                make_logger, args)

I/O Contract

Inputs

Name Type Required Description
make_agent Callable Yes Factory returning Agent
make_replay_train Callable Yes Factory returning training Replay buffer
make_replay_eval Callable Yes Factory returning evaluation Replay buffer
make_env_train Callable(int) Yes Factory returning training environment
make_env_eval Callable(int) Yes Factory returning evaluation environment
make_stream Callable(replay, mode) Yes Factory returning data stream
make_logger Callable Yes Factory returning Logger
args elements.Config Yes Distributed config: actor_addr, replay_addr, logger_addr, actor_batch, agent_process, envs, eval_envs, remote_envs, remote_replay

Outputs

Name Type Description
Running processes Side effect Spawned OS processes for agent, replay, logger, and environments connected via portal RPC

Usage Examples

from functools import partial as bind
import embodied

# Launch distributed training (called from main.py)
embodied.run.parallel.combined(
    bind(make_agent, config),
    bind(make_replay, config, 'replay'),
    bind(make_replay, config, 'replay_eval', 'eval'),
    bind(make_env, config),
    bind(make_env, config),
    bind(make_stream, config),
    bind(make_logger, config),
    args)
# Blocks until any process fails

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment