Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Node Server

From Leeroopedia


Knowledge Sources
Domains Streaming, Distributed, ProcessManagement
Last Updated 2026-02-08 08:00 GMT

Overview

Implements the NodeServer, a gRPC service that manages the lifecycle of Arroyo worker processes on a single machine, including process spawning, heartbeating, graceful/forced shutdown, and worker status tracking.

Description

The NodeServer runs on each machine in an Arroyo cluster and is responsible for:

  • Worker lifecycle -- Starting worker processes via start_worker (spawns the current binary with "worker" subcommand and appropriate environment variables), stopping workers via stop_worker (sends SIGTERM, waits, then SIGKILL), and tracking worker status.
  • Controller registration -- On startup, start registers the node with the controller via the register_node gRPC call, providing machine_id, task_slots, and gRPC address. It then runs a periodic heartbeat loop.
  • Worker monitoring -- A background task monitors each spawned child process. When a worker exits, it reports to the controller via worker_finished and updates the WORKERS Prometheus gauge.
  • gRPC interface -- Implements NodeGrpc with StartWorker, StopWorker, GetWorkers, and HeartbeatNode endpoints. StopWorker supports a "force" mode with escalating signals (SIGTERM, then SIGKILL after 5 seconds) and a "finished" query mode.
  • WorkerStatus -- Tracks each worker's name, job_id, allocated slots, running state, and PID.

The node manages workers as a HashMap<WorkerId, WorkerStatus> behind an Arc<Mutex<...>>. Worker IDs are randomly generated u64 values.

Usage

NodeServer is the main entry point for the "arroyo node" binary. The controller communicates with it to orchestrate worker placement across the cluster.

Code Reference

Source Location

Signature

pub struct NodeServer {
    id: MachineId,
    worker_finished_tx: Sender<WorkerFinishedReq>,
    workers: Arc<Mutex<HashMap<WorkerId, WorkerStatus>>>,
}

impl NodeServer {
    pub async fn start(self, shutdown_guard: ShutdownGuard) -> anyhow::Result<()>;
}

#[tonic::async_trait]
impl NodeGrpc for NodeServer {
    async fn start_worker(&self, request: Request<StartWorkerReq>) -> Result<Response<StartWorkerResp>, Status>;
    async fn stop_worker(&self, request: Request<StopWorkerReq>) -> Result<Response<StopWorkerResp>, Status>;
    async fn get_workers(&self, request: Request<GetWorkersReq>) -> Result<Response<GetWorkersResp>, Status>;
    async fn heartbeat_node(&self, request: Request<HeartbeatNodeReq>) -> Result<Response<()>, Status>;
}

Import

use arroyo_node::NodeServer;

I/O Contract

Inputs

Name Type Required Description
StartWorkerReq protobuf Yes Job ID, machine ID, task slots, run ID, environment variables
StopWorkerReq protobuf Yes Worker ID, job ID, force flag, and finish-check flag

Outputs

Name Type Description
StartWorkerResp protobuf Worker ID of the spawned worker process
StopWorkerResp protobuf Stop status: NotFound, Stopping, Stopped
GetWorkersResp protobuf List of WorkerInfo for all managed workers

Usage Examples

use arroyo_node::NodeServer;

// Create and start the node server
let node = NodeServer::new(machine_id, worker_finished_tx);
node.start(shutdown_guard).await?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment