Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Workflow:ArroyoSystems Arroyo Checkpoint Recovery

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Fault_Tolerance, State_Management
Last Updated 2026-02-08 09:00 GMT

Overview

End-to-end process for periodically checkpointing distributed operator state in Arroyo streaming pipelines and recovering from failures by restoring consistent snapshots from object storage.

Description

This workflow describes Arroyo's checkpoint-based fault tolerance mechanism, which provides exactly-once processing guarantees for stateful streaming pipelines. The system periodically takes distributed snapshots of all operator state by injecting checkpoint barrier messages into the data stream. Each operator serializes its state tables to Parquet files and uploads them to object storage (S3, GCS, or local filesystem). The controller coordinates the checkpoint process, tracking completion across all operators and subtasks. When a failure occurs, the system recovers by loading the most recent completed checkpoint and restarting processing from that point.

Key aspects:

  • Checkpoint barriers flow through the dataflow graph, ensuring consistent snapshots across operators
  • State is serialized to Parquet format on object storage for durability and efficient restoration
  • Two-phase commit protocol ensures exactly-once delivery for file sinks
  • Recovery restores state from the last completed checkpoint and replays data from source offsets

Usage

This workflow operates automatically as part of any running pipeline with stateful operators (windows, joins, aggregations, or connectors with offsets). Checkpointing is configured via the checkpoint interval parameter when creating a pipeline. Recovery is triggered automatically when the controller detects worker failures, or when a pipeline is restarted from a specific checkpoint.

Execution Steps

Step 1: Checkpoint Trigger

The controller's Running state monitor fires a periodic checkpoint based on the configured interval. It creates a new CheckpointState object that tracks which operator/subtask pairs must report completion. The controller sends a Checkpoint command to the job controller, which injects checkpoint barrier messages into all source operators. Each barrier carries a monotonically increasing checkpoint epoch number.

Key considerations:

  • Checkpoint interval is configurable per-pipeline (default is typically 10 seconds)
  • A new checkpoint is not initiated until the previous one completes
  • The CheckpointState tracks the expected set of all operator/subtask pairs

Step 2: Barrier Propagation

Checkpoint barriers flow through the dataflow graph following the data stream. When an operator with multiple inputs receives a barrier from one input, it pauses that input and waits for barriers from all other inputs before processing (barrier alignment). This ensures that the state snapshot represents a consistent cut across the entire dataflow graph. Operators buffer incoming data from aligned inputs until all barriers arrive.

Key considerations:

  • Barrier alignment ensures exactly-once semantics by preventing records from being counted in two different checkpoints
  • Single-input operators can immediately process the barrier without alignment
  • The barrier alignment protocol is similar to the Chandy-Lamport distributed snapshot algorithm

Step 3: State Serialization

When an operator receives the checkpoint barrier (after alignment if multi-input), it serializes all its state tables to Parquet files. State tables include ExpiringTimeKeyMap (for windowed state with time-based expiration), GlobalKeyedMap (for simple key-value state), and connector-specific state (source offsets, sink commit metadata). Each state table writes its data as Arrow record batches to Parquet files and uploads them to the configured storage backend.

Key considerations:

  • State is stored as Parquet files for efficient columnar access and compression
  • Each operator subtask writes its own state files independently
  • The ParquetBackend handles the serialization and upload to object storage
  • State includes both the data tables and operational metadata (watermarks, offsets)

Step 4: Checkpoint Completion

Each operator subtask reports its checkpoint metadata (file locations, byte counts, watermarks) back to the controller via the control channel. The controller's CheckpointState aggregates these reports, merging table-level metadata across subtasks within each operator. When all expected subtasks have reported, the checkpoint is marked as complete. The final CheckpointMetadata protobuf is persisted to the database, recording the epoch number, operator states, and timing information.

Key considerations:

  • The controller tracks completion at the subtask level for fine-grained progress monitoring
  • Checkpoint metadata includes per-operator table metadata merged across all subtasks
  • Checkpoint timing events are logged for observability (start time, per-operator completion, total duration)
  • If a subtask fails during checkpointing, the entire checkpoint is abandoned

Step 5: Two Phase Commit (Sinks)

For file-based sinks that support exactly-once semantics (filesystem, Delta Lake, Iceberg), the checkpoint completion triggers a two-phase commit protocol. In the first phase (during checkpointing), the sink writes data to temporary staging files. In the second phase (after checkpoint completion is confirmed), the staging files are atomically committed to their final locations. This ensures that output files are only visible after the checkpoint that produced them is confirmed durable.

Key considerations:

  • The two-phase commit prevents partial or duplicate output files
  • Staging files are cleaned up if the checkpoint fails
  • Delta Lake and Iceberg sinks use their respective commit protocols for atomic table updates
  • Non-transactional sinks (Kafka, Redis) use at-least-once semantics with idempotent writes

Step 6: Failure Detection

The controller monitors worker health through heartbeat messages. When a worker fails to send heartbeats within the configured timeout, or when a worker reports a task failure, the controller transitions the job to the Recovering state. The recovery process identifies the most recent completed checkpoint to use as the restore point.

Key considerations:

  • Heartbeat timeouts trigger recovery even if the worker process crashed without reporting
  • Task-level failures (panics, OOM) are detected and reported to the controller
  • The controller distinguishes between recoverable failures (worker crash) and fatal failures (invalid program)

Step 7: State Restoration

During recovery, the controller transitions to Scheduling, which relaunches workers and computes new task assignments. The most recent checkpoint metadata is loaded, providing the file locations for each operator's state. When workers start execution, each operator subtask downloads its state files from object storage and deserializes the Parquet data back into in-memory state tables. Source operators restore their offsets and resume reading from the exact position recorded in the checkpoint. Processing resumes from the consistent snapshot point.

Key considerations:

  • State restoration happens during the Scheduling state, before execution begins
  • Each subtask restores only its own partition of the state
  • Source offsets ensure that no data is lost or duplicated during recovery
  • The recovery process is transparent to the pipeline SQL logic

Execution Diagram

GitHub URL

Workflow Repository