Implementation:ArroyoSystems Arroyo Node Server
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Distributed, ProcessManagement |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements the NodeServer, a gRPC service that manages the lifecycle of Arroyo worker processes on a single machine, including process spawning, heartbeating, graceful/forced shutdown, and worker status tracking.
Description
The NodeServer runs on each machine in an Arroyo cluster and is responsible for:
- Worker lifecycle -- Starting worker processes via start_worker (spawns the current binary with "worker" subcommand and appropriate environment variables), stopping workers via stop_worker (sends SIGTERM, waits, then SIGKILL), and tracking worker status.
- Controller registration -- On startup, start registers the node with the controller via the register_node gRPC call, providing machine_id, task_slots, and gRPC address. It then runs a periodic heartbeat loop.
- Worker monitoring -- A background task monitors each spawned child process. When a worker exits, it reports to the controller via worker_finished and updates the WORKERS Prometheus gauge.
- gRPC interface -- Implements NodeGrpc with StartWorker, StopWorker, GetWorkers, and HeartbeatNode endpoints. StopWorker supports a "force" mode with escalating signals (SIGTERM, then SIGKILL after 5 seconds) and a "finished" query mode.
- WorkerStatus -- Tracks each worker's name, job_id, allocated slots, running state, and PID.
The node manages workers as a HashMap<WorkerId, WorkerStatus> behind an Arc<Mutex<...>>. Worker IDs are randomly generated u64 values.
Usage
NodeServer is the main entry point for the "arroyo node" binary. The controller communicates with it to orchestrate worker placement across the cluster.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-node/src/lib.rs
Signature
pub struct NodeServer {
id: MachineId,
worker_finished_tx: Sender<WorkerFinishedReq>,
workers: Arc<Mutex<HashMap<WorkerId, WorkerStatus>>>,
}
impl NodeServer {
pub async fn start(self, shutdown_guard: ShutdownGuard) -> anyhow::Result<()>;
}
#[tonic::async_trait]
impl NodeGrpc for NodeServer {
async fn start_worker(&self, request: Request<StartWorkerReq>) -> Result<Response<StartWorkerResp>, Status>;
async fn stop_worker(&self, request: Request<StopWorkerReq>) -> Result<Response<StopWorkerResp>, Status>;
async fn get_workers(&self, request: Request<GetWorkersReq>) -> Result<Response<GetWorkersResp>, Status>;
async fn heartbeat_node(&self, request: Request<HeartbeatNodeReq>) -> Result<Response<()>, Status>;
}
Import
use arroyo_node::NodeServer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| StartWorkerReq | protobuf | Yes | Job ID, machine ID, task slots, run ID, environment variables |
| StopWorkerReq | protobuf | Yes | Worker ID, job ID, force flag, and finish-check flag |
Outputs
| Name | Type | Description |
|---|---|---|
| StartWorkerResp | protobuf | Worker ID of the spawned worker process |
| StopWorkerResp | protobuf | Stop status: NotFound, Stopping, Stopped |
| GetWorkersResp | protobuf | List of WorkerInfo for all managed workers |
Usage Examples
use arroyo_node::NodeServer;
// Create and start the node server
let node = NodeServer::new(machine_id, worker_finished_tx);
node.start(shutdown_guard).await?;