Implementation:Danijar Dreamerv3 Parallel Actor
| Knowledge Sources | |
|---|---|
| Domains | Reinforcement_Learning, Distributed_Systems |
| Last Updated | 2026-02-15 09:00 GMT |
Overview
Concrete tool for running the distributed actor inference server that batches observations from environment processes and runs GPU policy inference provided by the DreamerV3 parallel module.
Description
The parallel_actor() function in embodied/run/parallel.py creates a portal.BatchServer that accumulates observations from environment processes into batches of args.actor_batch size. The workfn runs agent.policy() on each batch and returns actions. The postfn asynchronously sends transitions to the replay and logger servers via portal.Client.
Per-environment carry states are maintained in a defaultdict keyed by envid. The function waits on a threading.Barrier to ensure the learner has restored the checkpoint before collecting data.
Usage
Spawned as a thread within parallel_agent() alongside the learner thread. Shares the agent object with the learner.
Code Reference
Source Location
- Repository: dreamerv3
- File: embodied/run/parallel.py
- Lines: L73-127
Signature
def parallel_actor(agent, barrier, args):
"""
Run the distributed actor inference server.
Args:
agent: Agent - Shared agent object (updated by learner thread).
barrier: threading.Barrier - Synchronization with learner for checkpoint restore.
args: elements.Config with actor_addr, actor_batch, actor_threads,
replay_addr, logger_addr, log_every.
"""
Import
from embodied.run.parallel import parallel_actor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| agent | Agent | Yes | Shared agent with policy() method (parameters updated by learner) |
| barrier | threading.Barrier | Yes | Synchronization point with learner thread |
| args | elements.Config | Yes | actor_addr, actor_batch, actor_threads, replay_addr, logger_addr |
Outputs
| Name | Type | Description |
|---|---|---|
| BatchServer | portal.BatchServer | Running at args.actor_addr, accepting 'act' RPCs from environments |
| Transitions | RPC calls | Sent to replay (add_batch) and logger (tran) servers |
Usage Examples
# Spawned internally by parallel_agent:
import threading
import portal
agent = make_agent()
barrier = threading.Barrier(2)
workers = [
portal.Thread(parallel_actor, agent, barrier, args),
portal.Thread(parallel_learner, agent, barrier, args),
]
portal.run(workers)