Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Checkpoint State

From Leeroopedia


Template:Implementation

Checkpoint State Implementation

Implementation of the CheckpointState lifecycle.

CheckpointState is the controller-side data structure that tracks the progress of a distributed checkpoint. It is created when a checkpoint is initiated and consumed when all operators have completed.

Code Reference

  • File: crates/arroyo-state/src/checkpoint_state.rs (L76-L182, L420-L422)

Core Interface

impl CheckpointState {
    pub fn new(
        job_id: Arc<String>,
        checkpoint_id: i64,
        epoch: u32,
        min_epoch: u32,
        start_time: SystemTime,
        operators: HashSet<String>,
        subtask_counts: HashMap<String, usize>,
        commit_data: Option<HashMap<String, HashMap<u32, Vec<u8>>>>,
    ) -> Self

    pub fn finish_subtask(
        &mut self,
        operator_id: String,
        subtask_index: u32,
        metadata: OperatorCheckpointMetadata,
    ) -> anyhow::Result<()>

    pub fn done(&self) -> bool
}

Detailed Behavior

CheckpointState::new

Creates a new tracking structure for a checkpoint epoch.

Parameter Type Description
job_id Arc<String> Unique identifier for the job
checkpoint_id i64 Database-assigned checkpoint identifier
epoch u32 The checkpoint epoch number
min_epoch u32 Minimum epoch to retain (for garbage collection)
start_time SystemTime Wall-clock time when the checkpoint was initiated
operators HashSet<String> Set of all operator IDs that must complete
subtask_counts HashMap<String, usize> Expected number of subtasks per operator
commit_data Option<HashMap<String, HashMap<u32, Vec<u8>>>> Optional two-phase commit data from previous epoch

Internal state initialized:

  • A remaining-subtask counter for each operator, initialized to the expected subtask count.
  • An empty collection for aggregating per-subtask metadata.
  • An empty collection for tracking operator completion status.

CheckpointState::finish_subtask

Called when a single subtask reports that it has completed its local checkpoint.

Parameter Type Description
operator_id String The operator this subtask belongs to
subtask_index u32 The subtask's index within the operator
metadata OperatorCheckpointMetadata Metadata about the subtask's checkpointed state

Execution steps:

  1. Validates that the operator ID is expected and the subtask index is within range.
  2. Stores the subtask's OperatorCheckpointMetadata.
  3. Decrements the remaining-subtask counter for the operator.
  4. If the operator's counter reaches zero, marks the operator as complete.

CheckpointState::done

Returns true when all operators have all their subtasks completed.

pub fn done(&self) -> bool
// Returns true when all operator subtask counters have reached zero

This is the global completion check: when done() returns true, the controller can finalize the checkpoint by writing the global checkpoint record and advancing the epoch.

Lifecycle Summary

Phase Method Description
Creation new() Initialize tracking state with expected operators and subtask counts
Progress finish_subtask() Record individual subtask completion, decrement counters
Completion done() Check if all operators have fully completed
Finalization (external) Controller writes global checkpoint record and cleans up

I/O

  • new(): Creates tracking state from job topology information
  • finish_subtask(): Decrements operator counters and stores per-subtask metadata
  • done(): Returns true when all operators and subtasks have completed

Implements

Principle:ArroyoSystems_Arroyo_Checkpoint_Coordination Environment:ArroyoSystems_Arroyo_Object_Storage Heuristic:ArroyoSystems_Arroyo_Checkpoint_Interval_Tuning

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment