Implementation:Danijar Dreamerv3 Parallel Logger
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement_Learning, Distributed_Systems, Monitoring |
| Last Updated | 2026-02-15 09:00 GMT |
Overview
Concrete tool for running the centralized logging process that aggregates metrics from all distributed DreamerV3 processes via RPC.
Description
The parallel_logger() function in embodied/run/parallel.py deserializes the logger factory, creates the elements.Logger, and starts a portal.Server with two RPC endpoints: add (for scalar metrics) and tran (for episode transitions). It maintains per-environment episode accumulators, computes episode statistics on completion, drops stale episodes after episode_timeout, and periodically flushes metrics to all configured backends.
Usage
Spawned as a separate process by combined(). All other processes connect as clients.
Code Reference
Source Location
- Repository: dreamerv3
- File: embodied/run/parallel.py
- Lines: L317-413
Signature
def parallel_logger(make_logger, args):
"""
Run the centralized logging process.
Args:
make_logger: bytes or Callable -> Logger
args: elements.Config with logger_addr, log_every, save_every, episode_timeout.
"""
Import
from embodied.run.parallel import parallel_logger
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| make_logger | bytes or Callable | Yes | Factory for Logger (cloudpickle-serialized) |
| args | elements.Config | Yes | logger_addr, log_every, save_every, episode_timeout |
Outputs
| Name | Type | Description |
|---|---|---|
| RPC Server | portal.Server | Running at args.logger_addr with add and tran endpoints |
| Metrics | Files | Written to JSONL, TensorBoard, WandB at log_every intervals |
| Checkpoints | Files | Logger step counter saved at save_every intervals |
Usage Examples
# Spawned by combined() as a separate process
import portal
import cloudpickle
portal.Process(parallel_logger, cloudpickle.dumps(make_logger), args)