Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:ArroyoSystems Arroyo State Restoration

From Leeroopedia


Template:Principle

State Restoration

Principle: Recovering a streaming pipeline from a checkpoint after failure. Recovery involves identifying the last successful checkpoint, preparing checkpoint data for loading, re-scheduling workers, and restoring operator state from Parquet files in object storage.

Theoretical Basis

Checkpoint-based recovery restores the system to the last consistent distributed snapshot. This is the counterpart to the checkpoint creation process: where checkpointing captures state, recovery restores it. Together, they provide exactly-once processing semantics in the presence of failures.

Recovery Process

The recovery sequence consists of five phases:

Phase 1: Identify Last Completed Checkpoint

After a failure is detected, the controller queries the checkpoint metadata store to find the most recent successfully completed checkpoint epoch. Only globally-complete checkpoints (where all operators and subtasks reported success) are candidates.

Phase 2: Prepare Checkpoint Metadata

The controller calls StateBackend::prepare_checkpoint_load() to:

  • Retrieve per-operator checkpoint metadata from object storage.
  • Validate that all required metadata files exist and are consistent.
  • Construct a loading plan that maps each operator subtask to its state files.

Phase 3: Restart Workers

The controller transitions through the scheduling state to:

  • Allocate worker slots (potentially on new nodes if the original workers are unavailable).
  • Deploy operator tasks to the workers.
  • Distribute the checkpoint loading plan to each worker.

Phase 4: Restore Operator State

Each worker, upon startup with a checkpoint loading plan:

  • Downloads its assigned Parquet state files from object storage.
  • Deserializes the columnar data back into in-memory state tables.
  • Restores watermarks, timers, and other operator metadata.

Phase 5: Resume Processing

After state restoration is complete:

  • Source operators resume reading from the checkpointed position (e.g., Kafka offsets stored in the checkpoint).
  • Processing continues from the exact point captured in the snapshot.
  • All records processed between the checkpoint and the failure are re-processed, but the restored state ensures exactly-once semantics -- the re-processed records produce the same state transitions as the original processing.

Consistency Guarantees

The recovery process preserves the consistency properties established by the Chandy-Lamport-based checkpoint protocol:

  • State consistency: All operators are restored to the same epoch, ensuring a consistent distributed snapshot.
  • Input position consistency: Source offsets are restored from the checkpoint, ensuring no data is lost or skipped.
  • Output consistency: Two-phase commit metadata is restored, ensuring that committed output is not duplicated and uncommitted staged output is discarded.

Failure During Recovery

If a failure occurs during recovery itself, the process restarts from Phase 1 using the same or an earlier checkpoint. The recovery process is designed to be idempotent -- it can be safely retried without side effects.

Domains

  • Stream_Processing: Recovery is essential for maintaining continuous stream processing.
  • Fault_Tolerance: Checkpoint-based recovery provides strong fault tolerance guarantees.
  • Recovery: This principle defines the complete recovery lifecycle.

Related Implementation

Implementation:ArroyoSystems_Arroyo_Recovering_State

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment