Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:ArroyoSystems Arroyo Pipeline Shutdown

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Distributed_Systems, Fault_Tolerance
Last Updated 2026-02-08 12:00 GMT

Overview

Gracefully shutting down a streaming pipeline through a multi-phase process. Arroyo supports three shutdown modes: checkpoint-then-stop (taking a final checkpoint before stopping to enable clean restart), immediate stop (sending stop signals directly to workers), and force stop (killing workers without checkpoint when graceful shutdown fails).

Description

Pipeline shutdown in a distributed stream processing system must balance several concerns: ensuring state consistency for future restarts, draining in-flight data through operators, flushing sink outputs, and releasing cluster resources. Arroyo implements shutdown through two cooperating controller states:

Checkpoint-then-stop (CheckpointStopping): This is the preferred shutdown mode for pipelines that need to be restartable from a consistent state. The controller initiates a final checkpoint with a then_stop=true flag, which causes workers to stop processing after completing the checkpoint. The controller monitors checkpoint progress, and once both the checkpoint is complete and all tasks have finished, the pipeline transitions to the Stopped state. If the checkpoint fails or times out, the controller falls back to immediate stopping.

Immediate stop (Stopping with StopJob): The controller sends a StopExecutionReq gRPC message to all workers with the specified stop mode. Workers respond by stopping their operator tasks. The controller then waits for all workers to confirm termination, subject to a 60-second timeout. If workers fail to terminate gracefully within the timeout, the controller falls back to force stopping.

Force stop (Stopping with StopWorkers): The controller bypasses the workers' graceful shutdown protocol and directly asks the scheduler backend to terminate worker processes. This is the fallback when graceful shutdown fails or when no job controller is available (e.g., during early scheduling failures). It ensures cluster resources are released even when workers are unresponsive.

Escalation chain: The shutdown modes form a natural escalation chain:

  1. CheckpointStopping attempts a clean final checkpoint
  2. If a user requests immediate stop during CheckpointStopping, it transitions to Stopping with StopJob(Immediate)
  3. If Stopping with StopJob fails or times out, it self-transitions to Stopping with StopWorkers
  4. StopWorkers directly terminates worker processes through the scheduler

Usage

Shutdown is triggered by several conditions: an explicit user stop request (via API or configuration update), TTL expiration, or as part of the rescaling/restarting workflow. The stop mode is determined by the user's request: a "checkpoint" stop mode triggers CheckpointStopping, an "immediate" stop mode triggers Stopping with StopJob(Immediate), and a "force" stop mode triggers direct force termination.

Theoretical Basis

Graceful shutdown in distributed stream processing requires careful coordination to preserve system invariants:

Final state persistence: Taking a checkpoint before stopping ensures the pipeline can restart from a consistent state without data loss or duplication. The final checkpoint follows the same barrier-based protocol as regular checkpoints, but with an additional flag that causes operators to stop after completing their snapshot:

CheckpointStopping:
    initiate checkpoint with then_stop=true
    loop:
        wait for checkpoint completion messages
        if checkpoint complete AND all tasks finished:
            transition to Stopped
        if timeout or failure:
            fall back to Stopping(Immediate)

Ordered termination: When workers receive a stop signal, the dataflow graph shuts down in topological order. Sources stop producing first, which causes downstream operators to drain their input queues. Each operator flushes any buffered state to sinks before terminating. This ensures no data is silently dropped in intermediate buffers.

Two-phase shutdown: The Stopping state implements a two-phase protocol: first it sends the stop signal to workers (phase 1), then waits for workers to confirm they have terminated (phase 2). This separation allows the controller to detect workers that acknowledge the stop but fail to actually terminate:

Stopping(StopJob):
    send stop_execution to all workers
    wait_for_finish with 60-second timeout
    if success: transition to Stopped
    if error or timeout: self-transition to Stopping(StopWorkers)

Stopping(StopWorkers):
    scheduler.stop_workers(job_id, run_id, force=true)
    transition to Stopped

Timeout handling: Each shutdown phase has a bounded timeout (60 seconds for graceful worker termination). If the timeout expires, the controller escalates to the next level of force. This prevents a single unresponsive worker from blocking pipeline shutdown indefinitely. The escalation ensures that cluster resources are always eventually released, even in the face of worker failures.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment