Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Allenai Open instruct ActorManager

From Leeroopedia


Type Class (Ray Actor)
Source open_instruct/actor_manager.py:L43-257
Dependencies ray, fastapi, uvicorn, threading, collections
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete centralized manager for coordinating evaluation, weight updates, queue monitoring, and performance tracking across all distributed actors in the GRPO pipeline, provided by the Open Instruct library.

Description

ActorManager is a Ray actor class that serves as the central coordination hub for the GRPO training pipeline. It provides:

  1. Lifecycle control: The should_stop flag is polled by vLLM engines to determine when to shut down. The main training loop sets this via set_should_stop(True) when training completes.
  2. Queue monitoring: A background polling thread tracks the sizes of all Ray queues (prompt queue, results queue, evaluation queue) at 0.5-second intervals.
  3. Token statistics: Tracks total prefill and decode tokens, and computes rolling throughput (tokens/second) over a sliding window.
  4. Timing statistics: Tracks average training step time and batch generation time for performance analysis.
  5. KV-cache concurrency: Stores the maximum concurrent sequences each vLLM engine can handle, useful for tuning batch sizes.
  6. Web dashboard: Starts a FastAPI web server exposing a real-time HTML dashboard and JSON API for monitoring queue sizes, token throughput, timing stats, and concurrency limits.

The dashboard is automatically started when enable_queue_dashboard=True (default) and is accessible at http://{hostname}:{port}.

Usage

The ActorManager is created once on the head node and passed as a Ray actor handle to all other actors. It is typically accessed via ray.get(actor_manager.method.remote(args)).

Code Reference

Source Location

Signature

class ActorManager:
    def __init__(
        self,
        queues: dict[str, ray_queue.Queue],
        args: Any,
        streaming_config: data_loader.StreamingDataLoaderConfig,
        vllm_config: data_loader.VLLMConfig,
    ):

Import

from open_instruct.actor_manager import ActorManager

I/O Contract

Inputs

Name Type Description
queues dict[str, Queue] Dictionary of named Ray queues to monitor (e.g., {"prompt_queue": q1, "results_queue": q2}).
args ExperimentConfig Experiment configuration for dashboard settings and batch size computation.
streaming_config StreamingDataLoaderConfig Streaming config for computing expected batch size in the dashboard.
vllm_config VLLMConfig vLLM config for computing total concurrency in the dashboard.

Key Methods

Method Return Type Description
set_should_stop(should_stop) None Set the global stop flag. Engines poll this to know when to shut down.
should_stop() bool Query the current stop flag. Called by engines periodically.
report_token_statistics(token_stats) None Report token counts from a generation batch for throughput tracking.
report_training_step_time(duration) None Report the time taken for a training step.
report_batch_generation_time(duration) None Report the time taken to generate a batch.
set_kv_cache_max_concurrency(max_concurrency) None Set the KV-cache concurrency limit per engine.
get_token_stats() dict Return current token throughput statistics.
get_timing_stats() dict Return average training and generation timing.
get_dashboard_port() None Return the port where the dashboard is running.
cleanup() None Stop background threads and clean up resources.

Dashboard API Endpoints

Endpoint Description
GET / HTML dashboard with real-time visualizations.
GET /api/status JSON endpoint returning queue sizes, token stats, timing stats, concurrency, and should_stop flag.

Usage Examples

import ray
from ray.util import queue as ray_queue
from open_instruct.actor_manager import ActorManager
from open_instruct.data_loader import StreamingDataLoaderConfig, VLLMConfig

# Create queues
prompt_queue = ray_queue.Queue(maxsize=100)
results_queue = ray_queue.Queue(maxsize=100)

# Create the actor manager as a Ray actor
actor_manager = ray.remote(ActorManager).remote(
    queues={"prompt_queue": prompt_queue, "results_queue": results_queue},
    args=experiment_config,
    streaming_config=streaming_config,
    vllm_config=vllm_config,
)

# Pass to vLLM engines and other actors
engines = create_vllm_engines(..., actor_manager=actor_manager)

# During training, report metrics
ray.get(actor_manager.report_training_step_time.remote(1.5))

# At the end of training
ray.get(actor_manager.set_should_stop.remote(True))

# Get dashboard URL
port = ray.get(actor_manager.get_dashboard_port.remote())
print(f"Dashboard: http://head-node:{port}")

# Cleanup
ray.get(actor_manager.cleanup.remote())

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment