Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Checkpoint Trigger

From Leeroopedia


Template:Implementation

Checkpoint Trigger Implementation

Implementation of checkpoint triggering within JobController::progress.

This implementation realizes the periodic checkpoint triggering principle by embedding the trigger logic directly into the controller's main progress loop. The controller evaluates three conditions on every tick to decide whether to initiate a new checkpoint.

Code Reference

  • File: crates/arroyo-controller/src/job_controller/mod.rs (L843-L850)

Core Logic

The checkpoint trigger logic within progress():

// check on checkpointing
if self.model.checkpoint_state.is_some() {
    self.model.finish_checkpoint_if_done(&self.db).await?;
} else if self.model.last_checkpoint.elapsed() > self.config.checkpoint_interval
    && self.cleanup_task.is_none()
{
    self.checkpoint(false).await?;
}

Detailed Behavior

The trigger follows a three-branch decision tree:

Condition Action Rationale
checkpoint_state.is_some() Call finish_checkpoint_if_done() A checkpoint is already in-flight; check if it has completed
Elapsed time > interval AND no cleanup task Call self.checkpoint(false) Time to start a new checkpoint; no concurrent checkpoint or cleanup
Otherwise No action Either the interval has not elapsed or a cleanup task is in progress

Condition Breakdown

  • self.model.checkpoint_state.is_some(): Guards against concurrent checkpoints. If a CheckpointState exists, a checkpoint is already in progress. The controller calls finish_checkpoint_if_done() to check whether all operators have completed their checkpoint for this epoch.
  • self.model.last_checkpoint.elapsed() > self.config.checkpoint_interval: Timer-based trigger. last_checkpoint is an Instant recorded when the previous checkpoint completed; checkpoint_interval is the configured Duration.
  • self.cleanup_task.is_none(): Ensures no old checkpoint cleanup is running. Cleanup removes expired checkpoint data from object storage, and initiating a new checkpoint during cleanup could cause conflicts.

The checkpoint(false) Call

The boolean parameter false indicates this is a regular checkpoint (not a "then_stop" checkpoint used during graceful shutdown). The checkpoint() method:

  1. Creates a new CheckpointState to track operator completion
  2. Assigns a new epoch number
  3. Sends CheckpointBarrier messages to all source operators
  4. Records the checkpoint start time

I/O

  • Input: Timer elapsed since last checkpoint compared against configured interval (self.config.checkpoint_interval)
  • Output: Initiates checkpoint by creating CheckpointState and sending CheckpointBarrier to all source operators

Implements

Principle:ArroyoSystems_Arroyo_Checkpoint_Triggering Heuristic:ArroyoSystems_Arroyo_Checkpoint_Interval_Tuning

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment