Implementation:ArroyoSystems Arroyo Worker Server
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Distributed_Systems, gRPC |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
WorkerServer is the main gRPC server process for an Arroyo worker node, responsible for registering with the controller, initializing and executing dataflow programs, handling checkpoints, commits, and lifecycle management.
Description
The WorkerServer struct represents a single worker process in the Arroyo distributed streaming engine. It implements the WorkerGrpc tonic service trait, exposing RPC endpoints for the controller to manage the worker's lifecycle. The worker operates through distinct phases tracked by WorkerExecutionPhase: Idle, Initializing, Running, and Failed.
Key responsibilities:
- Registration: On startup (start_async), the worker binds a TCP listener for gRPC, opens a data port via NetworkManager, and registers itself with the controller providing its RPC address, data address, and available resource slots.
- Execution Initialization: The start_execution RPC triggers asynchronous initialization via initialize_inner, which deserializes the LogicalProgram from the request, loads UDF dylibs and Python UDFs into the Registry, constructs a Program from the logical graph, creates an Engine, and starts it. A control loop (run_control_loop) is spawned to relay ControlResp messages (checkpoint events, task completions, failures, heartbeats) back to the controller.
- Checkpoint & Commit: The checkpoint RPC sends ControlMessage::Checkpoint to all source operators and ControlMessage::Commit to sinks. The commit RPC dispatches commit data to the appropriate operators based on operator_to_node mapping.
- Lifecycle: stop_execution sends stop signals to sources; job_finished transitions the worker back to Idle and triggers graceful shutdown.
The LocalRunner struct provides a simplified execution path for single-node local execution, bypassing gRPC and controller registration.
Usage
WorkerServer is the entry point for each Arroyo worker process. It is instantiated via from_config (reading environment variables JOB_ID and RUN_ID) or new (with explicit parameters), then started with start_async.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-worker/src/lib.rs
Signature
pub struct WorkerServer {
id: WorkerId,
job_id: String,
run_id: u64,
name: &'static str,
phase: Arc<Mutex<WorkerExecutionPhase>>,
network: Arc<Mutex<Option<NetworkManager>>>,
shutdown_guard: ShutdownGuard,
}
impl WorkerServer {
pub fn from_config(shutdown_guard: ShutdownGuard) -> Result<Self>;
pub fn new(
name: &'static str,
worker_id: WorkerId,
job_id: String,
run_id: u64,
shutdown_guard: ShutdownGuard,
) -> Self;
pub fn id(&self) -> WorkerId;
pub fn job_id(&self) -> &str;
pub async fn start_async(self) -> Result<()>;
pub async fn start(self) -> Result<()>;
}
pub struct LocalRunner {
program: Program,
control_rx: Receiver<ControlResp>,
}
impl LocalRunner {
pub fn new(program: Program, control_rx: Receiver<ControlResp>) -> Self;
pub async fn run(mut self) -> anyhow::Result<()>;
}
Import
use arroyo_worker::{WorkerServer, LocalRunner};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| StartExecutionReq | gRPC Request | Yes | Contains the serialized LogicalProgram, task assignments, and restore epoch |
| CheckpointReq | gRPC Request | Yes | Triggers checkpoint or commit operations with epoch and timing data |
| CommitReq | gRPC Request | Yes | Contains per-operator commit data to be dispatched to operator subtasks |
| StopExecutionReq | gRPC Request | No | Signals sources to stop with a specified stop mode |
| JobFinishedReq | gRPC Request | No | Signals job completion, triggering transition to Idle and shutdown |
Outputs
| Name | Type | Description |
|---|---|---|
| RegisterWorkerReq | gRPC to Controller | Registration message sent on startup with worker info and addresses |
| TaskCheckpointEventReq | gRPC to Controller | Relayed checkpoint events from operator subtasks |
| TaskCheckpointCompletedReq | gRPC to Controller | Relayed checkpoint completion with metadata |
| TaskFinishedReq | gRPC to Controller | Notification that an operator subtask has finished |
| TaskFailedReq | gRPC to Controller | Notification of an operator subtask failure with error details |
| HeartbeatReq | gRPC to Controller | Periodic heartbeat sent every 5 seconds |
| MetricsResp | gRPC Response | Prometheus metrics encoded as protobuf MetricFamily messages |
Usage Examples
use arroyo_server_common::shutdown::ShutdownGuard;
// Create from environment configuration
let shutdown_guard = ShutdownGuard::new();
let worker = WorkerServer::from_config(shutdown_guard)?;
// Start the worker (registers with controller, begins serving gRPC)
worker.start_async().await?;
// Or for local/embedded execution:
let runner = LocalRunner::new(program, control_rx);
runner.run().await?;