Implementation:ArroyoSystems Arroyo Checkpoint State
Checkpoint State Implementation
Implementation of the CheckpointState lifecycle.
CheckpointState is the controller-side data structure that tracks the progress of a distributed checkpoint. It is created when a checkpoint is initiated and consumed when all operators have completed.
Code Reference
- File:
crates/arroyo-state/src/checkpoint_state.rs(L76-L182, L420-L422)
Core Interface
impl CheckpointState {
pub fn new(
job_id: Arc<String>,
checkpoint_id: i64,
epoch: u32,
min_epoch: u32,
start_time: SystemTime,
operators: HashSet<String>,
subtask_counts: HashMap<String, usize>,
commit_data: Option<HashMap<String, HashMap<u32, Vec<u8>>>>,
) -> Self
pub fn finish_subtask(
&mut self,
operator_id: String,
subtask_index: u32,
metadata: OperatorCheckpointMetadata,
) -> anyhow::Result<()>
pub fn done(&self) -> bool
}
Detailed Behavior
CheckpointState::new
Creates a new tracking structure for a checkpoint epoch.
| Parameter | Type | Description |
|---|---|---|
job_id |
Arc<String> |
Unique identifier for the job |
checkpoint_id |
i64 |
Database-assigned checkpoint identifier |
epoch |
u32 |
The checkpoint epoch number |
min_epoch |
u32 |
Minimum epoch to retain (for garbage collection) |
start_time |
SystemTime |
Wall-clock time when the checkpoint was initiated |
operators |
HashSet<String> |
Set of all operator IDs that must complete |
subtask_counts |
HashMap<String, usize> |
Expected number of subtasks per operator |
commit_data |
Option<HashMap<String, HashMap<u32, Vec<u8>>>> |
Optional two-phase commit data from previous epoch |
Internal state initialized:
- A remaining-subtask counter for each operator, initialized to the expected subtask count.
- An empty collection for aggregating per-subtask metadata.
- An empty collection for tracking operator completion status.
CheckpointState::finish_subtask
Called when a single subtask reports that it has completed its local checkpoint.
| Parameter | Type | Description |
|---|---|---|
operator_id |
String |
The operator this subtask belongs to |
subtask_index |
u32 |
The subtask's index within the operator |
metadata |
OperatorCheckpointMetadata |
Metadata about the subtask's checkpointed state |
Execution steps:
- Validates that the operator ID is expected and the subtask index is within range.
- Stores the subtask's
OperatorCheckpointMetadata. - Decrements the remaining-subtask counter for the operator.
- If the operator's counter reaches zero, marks the operator as complete.
CheckpointState::done
Returns true when all operators have all their subtasks completed.
pub fn done(&self) -> bool
// Returns true when all operator subtask counters have reached zero
This is the global completion check: when done() returns true, the controller can finalize the checkpoint by writing the global checkpoint record and advancing the epoch.
Lifecycle Summary
| Phase | Method | Description |
|---|---|---|
| Creation | new() |
Initialize tracking state with expected operators and subtask counts |
| Progress | finish_subtask() |
Record individual subtask completion, decrement counters |
| Completion | done() |
Check if all operators have fully completed |
| Finalization | (external) | Controller writes global checkpoint record and cleans up |
I/O
new(): Creates tracking state from job topology informationfinish_subtask(): Decrements operator counters and stores per-subtask metadatadone(): Returnstruewhen all operators and subtasks have completed
Implements
Principle:ArroyoSystems_Arroyo_Checkpoint_Coordination Environment:ArroyoSystems_Arroyo_Object_Storage Heuristic:ArroyoSystems_Arroyo_Checkpoint_Interval_Tuning