Principle:ArroyoSystems Arroyo Graceful Shutdown
Summary
Graceful Shutdown is the principle of cleanly terminating the local streaming cluster when a termination signal (SIGTERM or Ctrl+C/SIGINT) is received. The shutdown process involves requesting a checkpoint-then-stop for the active pipeline, waiting for the pipeline to reach the stopped state, performing a final SQLite backup to remote storage, and cleaning up all resources. This ensures no data loss on termination and enables the pipeline to be cleanly resumed later.
Theoretical Basis
In stream processing systems, abrupt termination can lead to data loss, duplicate processing on restart, or corrupted state. Graceful shutdown ensures that all in-flight data is processed and a consistent checkpoint is written before the process exits. This is especially critical for exactly-once processing guarantees, where the checkpoint represents the precise boundary between processed and unprocessed data.
Handler Pattern
Arroyo's graceful shutdown uses a handler pattern for coordinating the shutdown sequence:
- Registration: Before the main processing loop begins, shutdown handlers are registered with the
Shutdownframework. - Signal Detection: OS signals (SIGTERM from process managers, SIGINT from Ctrl+C) are intercepted by a dedicated signal-listening task.
- Handler Execution: When a signal is received, the registered handler's
shutdown()method is invoked asynchronously, performing application-specific cleanup (in this case, pipeline stop with checkpoint). - Token Cancellation: After the handler completes, the cancellation token is triggered, signaling all guarded tasks to terminate.
- Guard Counting: The framework uses reference-counted guards to track active tasks. When all guards are dropped, the shutdown is considered complete.
- Timeout: A configurable timeout (60 seconds in the pipeline cluster) ensures the process exits even if some tasks fail to terminate.
Checkpoint-on-Shutdown
The checkpoint-on-shutdown pattern is central to Arroyo's graceful shutdown. Rather than simply killing the pipeline, the shutdown handler requests a StopType::Checkpoint stop. This tells the controller to:
- Initiate a checkpoint across all workers.
- Wait for all workers to complete the checkpoint.
- Transition the pipeline to the "Stopped" state.
The resulting checkpoint contains the complete pipeline state, including operator state, in-flight messages, and source offsets. When the pipeline is resumed later (via arroyo run with the same state directory), it restores from this checkpoint and continues processing without data loss or duplication.
Multi-Signal Behavior
The shutdown framework supports a two-signal protocol:
- First signal: Triggers the graceful shutdown sequence (handler execution, checkpoint, cleanup).
- Second signal: Forces immediate termination if the graceful shutdown is taking too long. This is a safety valve for cases where the checkpoint hangs or a component fails to terminate.
The process exit code reflects the shutdown mode:
| Exit Code | Meaning |
|---|---|
| 0 | Clean shutdown completed successfully |
| 1 | Shutdown completed but with errors (e.g., pipeline failed) |
| 128 | Forced shutdown via second signal |
| 129 | Shutdown timed out |
Shutdown Sequence
The complete shutdown sequence for the local pipeline cluster proceeds as follows:
- Signal Reception -- SIGTERM or SIGINT is received by the signal-listening task.
- Handler Invocation -- The
PipelineShutdownHandler::shutdown()method is called. - Checkpoint Request -- A
patch_pipelinerequest withStopType::Checkpointis sent to the API. - State Wait -- The handler polls for the pipeline to reach "Stopped" or "Failed" state, with a 120-second timeout.
- Database Backup -- A final SQLite backup is triggered via
notify_db(), uploading the latest state to remote storage. - Token Cancellation -- The cancellation token is triggered, signaling all guarded tasks (controller, API, backup loop) to terminate.
- Guard Cleanup -- Each guarded task drops its
ShutdownGuard, decrementing the reference count. - Completion Signal -- When all guards are dropped (reference count reaches zero), the broadcast channel signals completion.
- Process Exit --
Shutdown::handle_shutdowncallsexit()with the appropriate code.
Guard-Based Resource Management
The ShutdownGuard mechanism provides structured concurrency for the shutdown process:
- Each long-running task (controller, API, backup loop) holds a
ShutdownGuard. - Guards carry a
CancellationTokenthat signals when the task should terminate. - Guards are reference-counted: the shutdown framework knows when all tasks have completed.
- Temporary tasks (like the pipeline submission) hold temporary guards that do not block shutdown when cancelled.
This ensures that:
- All tasks are notified of shutdown simultaneously via the cancellation token.
- The framework waits for all non-temporary tasks to finish their cleanup.
- No task is orphaned or forgotten during shutdown.
Relationship to State Persistence
Graceful shutdown is tightly coupled with state persistence. The final database backup ensures that even if the state directory is remote (e.g., S3), the latest metadata (including the checkpoint reference) is persisted before the process exits. Without this final backup, a restart might use a stale checkpoint, leading to reprocessing of data.