Implementation:ArroyoSystems Arroyo Shutdown States
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Distributed_Systems, Fault_Tolerance |
| Last Updated | 2026-02-08 12:00 GMT |
Overview
Implements the CheckpointStopping and Stopping controller states, which together handle all pipeline shutdown scenarios. CheckpointStopping manages the final checkpoint before shutdown, while Stopping handles direct worker termination with escalation from graceful to forced shutdown.
Description
The shutdown implementation spans two files:
CheckpointStopping::next (checkpoint_stopping.rs, lines 20-99) manages the checkpoint-then-stop shutdown mode:
- Enters a loop that first checks if the current checkpoint has finished via
job_controller.checkpoint_finished() - If the checkpoint is done, all tasks have finished (
job_controller.finished()), and the final checkpoint was already started, transitions toStopped - If the final checkpoint has not been started, calls
job_controller.checkpoint(true)to initiate a checkpoint withthen_stop=true. Thetrueparameter causes workers to stop after completing the checkpoint. - Waits for messages from the job message channel:
RunningMessage-- delegated tojob_controller.handle_message()for processing heartbeats, checkpoint completions, and task events. Errors trigger a retryable error.ConfigUpdatewithStopMode::immediate-- immediately transitions toStopping { stop_mode: StopBehavior::StopJob(StopMode::Immediate) }, abandoning the final checkpointConfigUpdatewithStopMode::force-- reserved for future implementation (currently panics withtodo!)- All other messages are ignored
- If checkpoint monitoring or initiation fails, returns a retryable error with a retry count of 10
Stopping::next (stopping.rs, lines 29-74) handles direct worker termination with automatic escalation:
- Matches on the combination of
(job_controller, stop_mode):(Some(controller), StopJob(stop_mode))-- graceful shutdown path:- Calls
job_controller.stop_job(stop_mode)which sendsStopExecutionReqgRPC messages to all workers - Waits for all workers to finish via
job_controller.wait_for_finish(ctx.rx)with a 60-second timeout (FINISH_TIMEOUT) - If the wait completes successfully, transitions to
Stopped - If the wait returns an error, logs the error and self-transitions to
StopBehavior::StopWorkers(force stop) - If the timeout expires, logs the timeout and self-transitions to
StopBehavior::StopWorkers(force stop)
- Calls
(_, StopWorkers)or(None, _)-- force shutdown path:- Calls
ctx.scheduler.stop_workers(job_id, Some(run_id), true)to directly terminate worker processes through the scheduler backend - If the scheduler call fails, returns a retryable error with a retry count of 20
- On success, transitions to
Stopped
- Calls
StopBehavior (stopping.rs, lines 13-16) is an enum that distinguishes between the two shutdown strategies:
StopJob(StopMode)-- graceful stop via gRPC to workers, with theStopMode(Immediate, Graceful, etc.) passed through to workersStopWorkers-- force stop via the scheduler backend, bypassing workers entirely
The escalation from StopJob to StopWorkers is achieved through self.stop_mode = StopBehavior::StopWorkers; return Self::next(self, ctx).await;, which recursively calls the state's next method with the updated stop behavior. This is a tail-recursive pattern that avoids adding stack frames since the method consumes self.
Usage
CheckpointStopping is entered when the Running state detects a "checkpoint" stop request from the user. Stopping is entered from multiple predecessor states: from Running (on immediate stop request or TTL expiration), from CheckpointStopping (when user escalates to immediate), from Finishing (after all sources complete), and from Recovering (when cleanup is needed before restart). Both states always transition to Stopped on success or return retryable errors on failure.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File:
crates/arroyo-controller/src/states/checkpoint_stopping.rs(lines 20-99) - File:
crates/arroyo-controller/src/states/stopping.rs(lines 29-74)
Signature
// checkpoint_stopping.rs
#[derive(Debug)]
pub struct CheckpointStopping {}
#[async_trait::async_trait]
impl State for CheckpointStopping {
fn name(&self) -> &'static str {
"CheckpointStopping"
}
async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}
// stopping.rs
const FINISH_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(Copy, Clone, Debug)]
pub enum StopBehavior {
StopJob(StopMode),
StopWorkers,
}
#[derive(Debug)]
pub struct Stopping {
pub stop_mode: StopBehavior,
}
#[async_trait::async_trait]
impl State for Stopping {
fn name(&self) -> &'static str {
"Stopping"
}
async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}
Import
// checkpoint_stopping.rs
use arroyo_rpc::grpc;
use crate::states::{
JobContext, State, Stopped, Transition,
stopping::{StopBehavior, Stopping},
};
// stopping.rs
use arroyo_rpc::grpc::rpc::StopMode;
use crate::states::{JobContext, State, Stopped, Transition, StateError};
I/O Contract
CheckpointStopping
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ctx | &mut JobContext |
Yes | Job context with active JobController managing workers and checkpoint state
|
| ctx.job_controller | Option<JobController> |
Yes | Must be Some; used to initiate final checkpoint, monitor checkpoint progress, check task completion, and handle messages
|
| ctx.rx | Receiver<JobMessage> |
Yes | Channel receiving RunningMessage (checkpoint completions, heartbeats) and ConfigUpdate (escalation requests)
|
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | Result<Transition, StateError> |
Transitions to Stopped (checkpoint complete and tasks finished) or Stopping { StopJob(Immediate) } (user escalation). Returns StateError on retryable checkpoint failures.
|
| Side effect | Final checkpoint | A checkpoint with then_stop=true is initiated, causing workers to stop after snapshotting
|
Stopping
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ctx | &mut JobContext |
Yes | Job context with optional JobController and scheduler access
|
| self.stop_mode | StopBehavior |
Yes | StopJob(StopMode) for graceful shutdown or StopWorkers for forced termination
|
| ctx.job_controller | Option<JobController> |
No | If Some and stop_mode is StopJob, used to send stop signals and wait for worker termination. If None, falls through to StopWorkers path.
|
| ctx.scheduler | Arc<dyn Scheduler> |
Yes | Used by the StopWorkers path to directly terminate worker processes
|
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | Result<Transition, StateError> |
Transitions to Stopped on success. Returns StateError on retryable scheduler or gRPC failures.
|
| Side effect | Worker termination | Workers are stopped via gRPC (StopJob) or killed via scheduler (StopWorkers)
|