Implementation:Danijar Dreamerv3 Combined Launcher
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement_Learning, Distributed_Systems |
| Last Updated | 2026-02-15 09:00 GMT |
Overview
Wrapper for the portal library's process management that serializes factory functions and spawns the distributed DreamerV3 training processes.
Description
The combined() function in embodied/run/parallel.py is the entry point for distributed training. It resolves auto-assigned network addresses for actor, replay, and logger servers, serializes all factory functions via cloudpickle.dumps, creates portal.Process (or portal.Thread) workers for each role, and calls portal.run(workers) which blocks until any process fails.
The parallel_agent() function creates a shared agent object and spawns actor and learner as threads within the same process (sharing GPU memory). A threading.Barrier synchronizes startup so the learner can restore a checkpoint before the actor begins collecting data.
Usage
This is a Wrapper Doc for the external portal library's Process/Thread/run APIs. Called when config.script == 'parallel'.
Code Reference
Source Location
- Repository: dreamerv3
- File: embodied/run/parallel.py
- Lines: L15-70 (combined + parallel_agent)
Signature
def combined(
make_agent, # Callable -> Agent
make_replay_train, # Callable -> Replay
make_replay_eval, # Callable -> Replay
make_env_train, # Callable(index) -> Env
make_env_eval, # Callable(index) -> Env
make_stream, # Callable(replay, mode) -> Stream
make_logger, # Callable -> Logger
args): # elements.Config
"""
Launch all distributed training processes.
Resolves network addresses, serializes factories via cloudpickle,
spawns agent/replay/logger/env processes, and blocks until completion.
"""
def parallel_agent(make_agent, args):
"""
Create shared agent and spawn actor + learner threads.
Args:
make_agent: bytes (cloudpickle-serialized) or Callable -> Agent
args: elements.Config with actor_addr, replay_addr, logger_addr, etc.
"""
Import
import embodied
embodied.run.parallel.combined(make_agent, make_replay_train, make_replay_eval,
make_env_train, make_env_eval, make_stream,
make_logger, args)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| make_agent | Callable | Yes | Factory returning Agent |
| make_replay_train | Callable | Yes | Factory returning training Replay buffer |
| make_replay_eval | Callable | Yes | Factory returning evaluation Replay buffer |
| make_env_train | Callable(int) | Yes | Factory returning training environment |
| make_env_eval | Callable(int) | Yes | Factory returning evaluation environment |
| make_stream | Callable(replay, mode) | Yes | Factory returning data stream |
| make_logger | Callable | Yes | Factory returning Logger |
| args | elements.Config | Yes | Distributed config: actor_addr, replay_addr, logger_addr, actor_batch, agent_process, envs, eval_envs, remote_envs, remote_replay |
Outputs
| Name | Type | Description |
|---|---|---|
| Running processes | Side effect | Spawned OS processes for agent, replay, logger, and environments connected via portal RPC |
Usage Examples
from functools import partial as bind
import embodied
# Launch distributed training (called from main.py)
embodied.run.parallel.combined(
bind(make_agent, config),
bind(make_replay, config, 'replay'),
bind(make_replay, config, 'replay_eval', 'eval'),
bind(make_env, config),
bind(make_env, config),
bind(make_stream, config),
bind(make_logger, config),
args)
# Blocks until any process fails