Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Worker Server

From Leeroopedia


Knowledge Sources
Domains Streaming, Distributed_Systems, gRPC
Last Updated 2026-02-08 08:00 GMT

Overview

WorkerServer is the main gRPC server process for an Arroyo worker node, responsible for registering with the controller, initializing and executing dataflow programs, handling checkpoints, commits, and lifecycle management.

Description

The WorkerServer struct represents a single worker process in the Arroyo distributed streaming engine. It implements the WorkerGrpc tonic service trait, exposing RPC endpoints for the controller to manage the worker's lifecycle. The worker operates through distinct phases tracked by WorkerExecutionPhase: Idle, Initializing, Running, and Failed.

Key responsibilities:

  • Registration: On startup (start_async), the worker binds a TCP listener for gRPC, opens a data port via NetworkManager, and registers itself with the controller providing its RPC address, data address, and available resource slots.
  • Execution Initialization: The start_execution RPC triggers asynchronous initialization via initialize_inner, which deserializes the LogicalProgram from the request, loads UDF dylibs and Python UDFs into the Registry, constructs a Program from the logical graph, creates an Engine, and starts it. A control loop (run_control_loop) is spawned to relay ControlResp messages (checkpoint events, task completions, failures, heartbeats) back to the controller.
  • Checkpoint & Commit: The checkpoint RPC sends ControlMessage::Checkpoint to all source operators and ControlMessage::Commit to sinks. The commit RPC dispatches commit data to the appropriate operators based on operator_to_node mapping.
  • Lifecycle: stop_execution sends stop signals to sources; job_finished transitions the worker back to Idle and triggers graceful shutdown.

The LocalRunner struct provides a simplified execution path for single-node local execution, bypassing gRPC and controller registration.

Usage

WorkerServer is the entry point for each Arroyo worker process. It is instantiated via from_config (reading environment variables JOB_ID and RUN_ID) or new (with explicit parameters), then started with start_async.

Code Reference

Source Location

Signature

pub struct WorkerServer {
    id: WorkerId,
    job_id: String,
    run_id: u64,
    name: &'static str,
    phase: Arc<Mutex<WorkerExecutionPhase>>,
    network: Arc<Mutex<Option<NetworkManager>>>,
    shutdown_guard: ShutdownGuard,
}

impl WorkerServer {
    pub fn from_config(shutdown_guard: ShutdownGuard) -> Result<Self>;
    pub fn new(
        name: &'static str,
        worker_id: WorkerId,
        job_id: String,
        run_id: u64,
        shutdown_guard: ShutdownGuard,
    ) -> Self;
    pub fn id(&self) -> WorkerId;
    pub fn job_id(&self) -> &str;
    pub async fn start_async(self) -> Result<()>;
    pub async fn start(self) -> Result<()>;
}

pub struct LocalRunner {
    program: Program,
    control_rx: Receiver<ControlResp>,
}

impl LocalRunner {
    pub fn new(program: Program, control_rx: Receiver<ControlResp>) -> Self;
    pub async fn run(mut self) -> anyhow::Result<()>;
}

Import

use arroyo_worker::{WorkerServer, LocalRunner};

I/O Contract

Inputs

Name Type Required Description
StartExecutionReq gRPC Request Yes Contains the serialized LogicalProgram, task assignments, and restore epoch
CheckpointReq gRPC Request Yes Triggers checkpoint or commit operations with epoch and timing data
CommitReq gRPC Request Yes Contains per-operator commit data to be dispatched to operator subtasks
StopExecutionReq gRPC Request No Signals sources to stop with a specified stop mode
JobFinishedReq gRPC Request No Signals job completion, triggering transition to Idle and shutdown

Outputs

Name Type Description
RegisterWorkerReq gRPC to Controller Registration message sent on startup with worker info and addresses
TaskCheckpointEventReq gRPC to Controller Relayed checkpoint events from operator subtasks
TaskCheckpointCompletedReq gRPC to Controller Relayed checkpoint completion with metadata
TaskFinishedReq gRPC to Controller Notification that an operator subtask has finished
TaskFailedReq gRPC to Controller Notification of an operator subtask failure with error details
HeartbeatReq gRPC to Controller Periodic heartbeat sent every 5 seconds
MetricsResp gRPC Response Prometheus metrics encoded as protobuf MetricFamily messages

Usage Examples

use arroyo_server_common::shutdown::ShutdownGuard;

// Create from environment configuration
let shutdown_guard = ShutdownGuard::new();
let worker = WorkerServer::from_config(shutdown_guard)?;

// Start the worker (registers with controller, begins serving gRPC)
worker.start_async().await?;

// Or for local/embedded execution:
let runner = LocalRunner::new(program, control_rx);
runner.run().await?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment