Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:ArroyoSystems Arroyo Graceful Shutdown

From Leeroopedia


Template:Principle

Summary

Graceful Shutdown is the principle of cleanly terminating the local streaming cluster when a termination signal (SIGTERM or Ctrl+C/SIGINT) is received. The shutdown process involves requesting a checkpoint-then-stop for the active pipeline, waiting for the pipeline to reach the stopped state, performing a final SQLite backup to remote storage, and cleaning up all resources. This ensures no data loss on termination and enables the pipeline to be cleanly resumed later.

Theoretical Basis

In stream processing systems, abrupt termination can lead to data loss, duplicate processing on restart, or corrupted state. Graceful shutdown ensures that all in-flight data is processed and a consistent checkpoint is written before the process exits. This is especially critical for exactly-once processing guarantees, where the checkpoint represents the precise boundary between processed and unprocessed data.

Handler Pattern

Arroyo's graceful shutdown uses a handler pattern for coordinating the shutdown sequence:

  1. Registration: Before the main processing loop begins, shutdown handlers are registered with the Shutdown framework.
  2. Signal Detection: OS signals (SIGTERM from process managers, SIGINT from Ctrl+C) are intercepted by a dedicated signal-listening task.
  3. Handler Execution: When a signal is received, the registered handler's shutdown() method is invoked asynchronously, performing application-specific cleanup (in this case, pipeline stop with checkpoint).
  4. Token Cancellation: After the handler completes, the cancellation token is triggered, signaling all guarded tasks to terminate.
  5. Guard Counting: The framework uses reference-counted guards to track active tasks. When all guards are dropped, the shutdown is considered complete.
  6. Timeout: A configurable timeout (60 seconds in the pipeline cluster) ensures the process exits even if some tasks fail to terminate.

Checkpoint-on-Shutdown

The checkpoint-on-shutdown pattern is central to Arroyo's graceful shutdown. Rather than simply killing the pipeline, the shutdown handler requests a StopType::Checkpoint stop. This tells the controller to:

  1. Initiate a checkpoint across all workers.
  2. Wait for all workers to complete the checkpoint.
  3. Transition the pipeline to the "Stopped" state.

The resulting checkpoint contains the complete pipeline state, including operator state, in-flight messages, and source offsets. When the pipeline is resumed later (via arroyo run with the same state directory), it restores from this checkpoint and continues processing without data loss or duplication.

Multi-Signal Behavior

The shutdown framework supports a two-signal protocol:

  • First signal: Triggers the graceful shutdown sequence (handler execution, checkpoint, cleanup).
  • Second signal: Forces immediate termination if the graceful shutdown is taking too long. This is a safety valve for cases where the checkpoint hangs or a component fails to terminate.

The process exit code reflects the shutdown mode:

Exit Code Meaning
0 Clean shutdown completed successfully
1 Shutdown completed but with errors (e.g., pipeline failed)
128 Forced shutdown via second signal
129 Shutdown timed out

Shutdown Sequence

The complete shutdown sequence for the local pipeline cluster proceeds as follows:

  1. Signal Reception -- SIGTERM or SIGINT is received by the signal-listening task.
  2. Handler Invocation -- The PipelineShutdownHandler::shutdown() method is called.
  3. Checkpoint Request -- A patch_pipeline request with StopType::Checkpoint is sent to the API.
  4. State Wait -- The handler polls for the pipeline to reach "Stopped" or "Failed" state, with a 120-second timeout.
  5. Database Backup -- A final SQLite backup is triggered via notify_db(), uploading the latest state to remote storage.
  6. Token Cancellation -- The cancellation token is triggered, signaling all guarded tasks (controller, API, backup loop) to terminate.
  7. Guard Cleanup -- Each guarded task drops its ShutdownGuard, decrementing the reference count.
  8. Completion Signal -- When all guards are dropped (reference count reaches zero), the broadcast channel signals completion.
  9. Process Exit -- Shutdown::handle_shutdown calls exit() with the appropriate code.

Guard-Based Resource Management

The ShutdownGuard mechanism provides structured concurrency for the shutdown process:

  • Each long-running task (controller, API, backup loop) holds a ShutdownGuard.
  • Guards carry a CancellationToken that signals when the task should terminate.
  • Guards are reference-counted: the shutdown framework knows when all tasks have completed.
  • Temporary tasks (like the pipeline submission) hold temporary guards that do not block shutdown when cancelled.

This ensures that:

  • All tasks are notified of shutdown simultaneously via the cancellation token.
  • The framework waits for all non-temporary tasks to finish their cleanup.
  • No task is orphaned or forgotten during shutdown.

Relationship to State Persistence

Graceful shutdown is tightly coupled with state persistence. The final database backup ensures that even if the state directory is remote (e.g., S3), the latest metadata (including the checkpoint reference) is persisted before the process exits. Without this final backup, a restart might use a stale checkpoint, leading to reprocessing of data.

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment