Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Shutdown States

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Distributed_Systems, Fault_Tolerance
Last Updated 2026-02-08 12:00 GMT

Overview

Implements the CheckpointStopping and Stopping controller states, which together handle all pipeline shutdown scenarios. CheckpointStopping manages the final checkpoint before shutdown, while Stopping handles direct worker termination with escalation from graceful to forced shutdown.

Description

The shutdown implementation spans two files:

CheckpointStopping::next (checkpoint_stopping.rs, lines 20-99) manages the checkpoint-then-stop shutdown mode:

  1. Enters a loop that first checks if the current checkpoint has finished via job_controller.checkpoint_finished()
  2. If the checkpoint is done, all tasks have finished (job_controller.finished()), and the final checkpoint was already started, transitions to Stopped
  3. If the final checkpoint has not been started, calls job_controller.checkpoint(true) to initiate a checkpoint with then_stop=true. The true parameter causes workers to stop after completing the checkpoint.
  4. Waits for messages from the job message channel:
    • RunningMessage -- delegated to job_controller.handle_message() for processing heartbeats, checkpoint completions, and task events. Errors trigger a retryable error.
    • ConfigUpdate with StopMode::immediate -- immediately transitions to Stopping { stop_mode: StopBehavior::StopJob(StopMode::Immediate) }, abandoning the final checkpoint
    • ConfigUpdate with StopMode::force -- reserved for future implementation (currently panics with todo!)
    • All other messages are ignored
  5. If checkpoint monitoring or initiation fails, returns a retryable error with a retry count of 10

Stopping::next (stopping.rs, lines 29-74) handles direct worker termination with automatic escalation:

  1. Matches on the combination of (job_controller, stop_mode):
    • (Some(controller), StopJob(stop_mode)) -- graceful shutdown path:
      • Calls job_controller.stop_job(stop_mode) which sends StopExecutionReq gRPC messages to all workers
      • Waits for all workers to finish via job_controller.wait_for_finish(ctx.rx) with a 60-second timeout (FINISH_TIMEOUT)
      • If the wait completes successfully, transitions to Stopped
      • If the wait returns an error, logs the error and self-transitions to StopBehavior::StopWorkers (force stop)
      • If the timeout expires, logs the timeout and self-transitions to StopBehavior::StopWorkers (force stop)
    • (_, StopWorkers) or (None, _) -- force shutdown path:
      • Calls ctx.scheduler.stop_workers(job_id, Some(run_id), true) to directly terminate worker processes through the scheduler backend
      • If the scheduler call fails, returns a retryable error with a retry count of 20
      • On success, transitions to Stopped

StopBehavior (stopping.rs, lines 13-16) is an enum that distinguishes between the two shutdown strategies:

  • StopJob(StopMode) -- graceful stop via gRPC to workers, with the StopMode (Immediate, Graceful, etc.) passed through to workers
  • StopWorkers -- force stop via the scheduler backend, bypassing workers entirely

The escalation from StopJob to StopWorkers is achieved through self.stop_mode = StopBehavior::StopWorkers; return Self::next(self, ctx).await;, which recursively calls the state's next method with the updated stop behavior. This is a tail-recursive pattern that avoids adding stack frames since the method consumes self.

Usage

CheckpointStopping is entered when the Running state detects a "checkpoint" stop request from the user. Stopping is entered from multiple predecessor states: from Running (on immediate stop request or TTL expiration), from CheckpointStopping (when user escalates to immediate), from Finishing (after all sources complete), and from Recovering (when cleanup is needed before restart). Both states always transition to Stopped on success or return retryable errors on failure.

Code Reference

Source Location

  • Repository: ArroyoSystems_Arroyo
  • File: crates/arroyo-controller/src/states/checkpoint_stopping.rs (lines 20-99)
  • File: crates/arroyo-controller/src/states/stopping.rs (lines 29-74)

Signature

// checkpoint_stopping.rs
#[derive(Debug)]
pub struct CheckpointStopping {}

#[async_trait::async_trait]
impl State for CheckpointStopping {
    fn name(&self) -> &'static str {
        "CheckpointStopping"
    }

    async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}

// stopping.rs
const FINISH_TIMEOUT: Duration = Duration::from_secs(60);

#[derive(Copy, Clone, Debug)]
pub enum StopBehavior {
    StopJob(StopMode),
    StopWorkers,
}

#[derive(Debug)]
pub struct Stopping {
    pub stop_mode: StopBehavior,
}

#[async_trait::async_trait]
impl State for Stopping {
    fn name(&self) -> &'static str {
        "Stopping"
    }

    async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}

Import

// checkpoint_stopping.rs
use arroyo_rpc::grpc;
use crate::states::{
    JobContext, State, Stopped, Transition,
    stopping::{StopBehavior, Stopping},
};

// stopping.rs
use arroyo_rpc::grpc::rpc::StopMode;
use crate::states::{JobContext, State, Stopped, Transition, StateError};

I/O Contract

CheckpointStopping

Inputs

Name Type Required Description
ctx &mut JobContext Yes Job context with active JobController managing workers and checkpoint state
ctx.job_controller Option<JobController> Yes Must be Some; used to initiate final checkpoint, monitor checkpoint progress, check task completion, and handle messages
ctx.rx Receiver<JobMessage> Yes Channel receiving RunningMessage (checkpoint completions, heartbeats) and ConfigUpdate (escalation requests)

Outputs

Name Type Description
(return) Result<Transition, StateError> Transitions to Stopped (checkpoint complete and tasks finished) or Stopping { StopJob(Immediate) } (user escalation). Returns StateError on retryable checkpoint failures.
Side effect Final checkpoint A checkpoint with then_stop=true is initiated, causing workers to stop after snapshotting

Stopping

Inputs

Name Type Required Description
ctx &mut JobContext Yes Job context with optional JobController and scheduler access
self.stop_mode StopBehavior Yes StopJob(StopMode) for graceful shutdown or StopWorkers for forced termination
ctx.job_controller Option<JobController> No If Some and stop_mode is StopJob, used to send stop signals and wait for worker termination. If None, falls through to StopWorkers path.
ctx.scheduler Arc<dyn Scheduler> Yes Used by the StopWorkers path to directly terminate worker processes

Outputs

Name Type Description
(return) Result<Transition, StateError> Transitions to Stopped on success. Returns StateError on retryable scheduler or gRPC failures.
Side effect Worker termination Workers are stopped via gRPC (StopJob) or killed via scheduler (StopWorkers)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment