Implementation:Allenai Open instruct ActorManager
| Type | Class (Ray Actor) |
|---|---|
| Source | open_instruct/actor_manager.py:L43-257
|
| Dependencies | ray, fastapi, uvicorn, threading, collections |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete centralized manager for coordinating evaluation, weight updates, queue monitoring, and performance tracking across all distributed actors in the GRPO pipeline, provided by the Open Instruct library.
Description
ActorManager is a Ray actor class that serves as the central coordination hub for the GRPO training pipeline. It provides:
- Lifecycle control: The
should_stopflag is polled by vLLM engines to determine when to shut down. The main training loop sets this viaset_should_stop(True)when training completes. - Queue monitoring: A background polling thread tracks the sizes of all Ray queues (prompt queue, results queue, evaluation queue) at 0.5-second intervals.
- Token statistics: Tracks total prefill and decode tokens, and computes rolling throughput (tokens/second) over a sliding window.
- Timing statistics: Tracks average training step time and batch generation time for performance analysis.
- KV-cache concurrency: Stores the maximum concurrent sequences each vLLM engine can handle, useful for tuning batch sizes.
- Web dashboard: Starts a FastAPI web server exposing a real-time HTML dashboard and JSON API for monitoring queue sizes, token throughput, timing stats, and concurrency limits.
The dashboard is automatically started when enable_queue_dashboard=True (default) and is accessible at http://{hostname}:{port}.
Usage
The ActorManager is created once on the head node and passed as a Ray actor handle to all other actors. It is typically accessed via ray.get(actor_manager.method.remote(args)).
Code Reference
Source Location
- Repository: Open Instruct
- File:
open_instruct/actor_manager.py
Signature
class ActorManager:
def __init__(
self,
queues: dict[str, ray_queue.Queue],
args: Any,
streaming_config: data_loader.StreamingDataLoaderConfig,
vllm_config: data_loader.VLLMConfig,
):
Import
from open_instruct.actor_manager import ActorManager
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
queues |
dict[str, Queue] |
Dictionary of named Ray queues to monitor (e.g., {"prompt_queue": q1, "results_queue": q2}).
|
args |
ExperimentConfig |
Experiment configuration for dashboard settings and batch size computation. |
streaming_config |
StreamingDataLoaderConfig |
Streaming config for computing expected batch size in the dashboard. |
vllm_config |
VLLMConfig |
vLLM config for computing total concurrency in the dashboard. |
Key Methods
| Method | Return Type | Description |
|---|---|---|
set_should_stop(should_stop) |
None |
Set the global stop flag. Engines poll this to know when to shut down. |
should_stop() |
bool |
Query the current stop flag. Called by engines periodically. |
report_token_statistics(token_stats) |
None |
Report token counts from a generation batch for throughput tracking. |
report_training_step_time(duration) |
None |
Report the time taken for a training step. |
report_batch_generation_time(duration) |
None |
Report the time taken to generate a batch. |
set_kv_cache_max_concurrency(max_concurrency) |
None |
Set the KV-cache concurrency limit per engine. |
get_token_stats() |
dict |
Return current token throughput statistics. |
get_timing_stats() |
dict |
Return average training and generation timing. |
get_dashboard_port() |
None | Return the port where the dashboard is running. |
cleanup() |
None |
Stop background threads and clean up resources. |
Dashboard API Endpoints
| Endpoint | Description |
|---|---|
GET / |
HTML dashboard with real-time visualizations. |
GET /api/status |
JSON endpoint returning queue sizes, token stats, timing stats, concurrency, and should_stop flag. |
Usage Examples
import ray
from ray.util import queue as ray_queue
from open_instruct.actor_manager import ActorManager
from open_instruct.data_loader import StreamingDataLoaderConfig, VLLMConfig
# Create queues
prompt_queue = ray_queue.Queue(maxsize=100)
results_queue = ray_queue.Queue(maxsize=100)
# Create the actor manager as a Ray actor
actor_manager = ray.remote(ActorManager).remote(
queues={"prompt_queue": prompt_queue, "results_queue": results_queue},
args=experiment_config,
streaming_config=streaming_config,
vllm_config=vllm_config,
)
# Pass to vLLM engines and other actors
engines = create_vllm_engines(..., actor_manager=actor_manager)
# During training, report metrics
ray.get(actor_manager.report_training_step_time.remote(1.5))
# At the end of training
ray.get(actor_manager.set_should_stop.remote(True))
# Get dashboard URL
port = ray.get(actor_manager.get_dashboard_port.remote())
print(f"Dashboard: http://head-node:{port}")
# Cleanup
ray.get(actor_manager.cleanup.remote())