Principle:ArroyoSystems Arroyo State Restoration
State Restoration
Principle: Recovering a streaming pipeline from a checkpoint after failure. Recovery involves identifying the last successful checkpoint, preparing checkpoint data for loading, re-scheduling workers, and restoring operator state from Parquet files in object storage.
Theoretical Basis
Checkpoint-based recovery restores the system to the last consistent distributed snapshot. This is the counterpart to the checkpoint creation process: where checkpointing captures state, recovery restores it. Together, they provide exactly-once processing semantics in the presence of failures.
Recovery Process
The recovery sequence consists of five phases:
Phase 1: Identify Last Completed Checkpoint
After a failure is detected, the controller queries the checkpoint metadata store to find the most recent successfully completed checkpoint epoch. Only globally-complete checkpoints (where all operators and subtasks reported success) are candidates.
Phase 2: Prepare Checkpoint Metadata
The controller calls StateBackend::prepare_checkpoint_load() to:
- Retrieve per-operator checkpoint metadata from object storage.
- Validate that all required metadata files exist and are consistent.
- Construct a loading plan that maps each operator subtask to its state files.
Phase 3: Restart Workers
The controller transitions through the scheduling state to:
- Allocate worker slots (potentially on new nodes if the original workers are unavailable).
- Deploy operator tasks to the workers.
- Distribute the checkpoint loading plan to each worker.
Phase 4: Restore Operator State
Each worker, upon startup with a checkpoint loading plan:
- Downloads its assigned Parquet state files from object storage.
- Deserializes the columnar data back into in-memory state tables.
- Restores watermarks, timers, and other operator metadata.
Phase 5: Resume Processing
After state restoration is complete:
- Source operators resume reading from the checkpointed position (e.g., Kafka offsets stored in the checkpoint).
- Processing continues from the exact point captured in the snapshot.
- All records processed between the checkpoint and the failure are re-processed, but the restored state ensures exactly-once semantics -- the re-processed records produce the same state transitions as the original processing.
Consistency Guarantees
The recovery process preserves the consistency properties established by the Chandy-Lamport-based checkpoint protocol:
- State consistency: All operators are restored to the same epoch, ensuring a consistent distributed snapshot.
- Input position consistency: Source offsets are restored from the checkpoint, ensuring no data is lost or skipped.
- Output consistency: Two-phase commit metadata is restored, ensuring that committed output is not duplicated and uncommitted staged output is discarded.
Failure During Recovery
If a failure occurs during recovery itself, the process restarts from Phase 1 using the same or an earlier checkpoint. The recovery process is designed to be idempotent -- it can be safely retried without side effects.
Domains
- Stream_Processing: Recovery is essential for maintaining continuous stream processing.
- Fault_Tolerance: Checkpoint-based recovery provides strong fault tolerance guarantees.
- Recovery: This principle defines the complete recovery lifecycle.