Implementation:ArroyoSystems Arroyo Checkpoint Trigger
Checkpoint Trigger Implementation
Implementation of checkpoint triggering within JobController::progress.
This implementation realizes the periodic checkpoint triggering principle by embedding the trigger logic directly into the controller's main progress loop. The controller evaluates three conditions on every tick to decide whether to initiate a new checkpoint.
Code Reference
- File:
crates/arroyo-controller/src/job_controller/mod.rs(L843-L850)
Core Logic
The checkpoint trigger logic within progress():
// check on checkpointing
if self.model.checkpoint_state.is_some() {
self.model.finish_checkpoint_if_done(&self.db).await?;
} else if self.model.last_checkpoint.elapsed() > self.config.checkpoint_interval
&& self.cleanup_task.is_none()
{
self.checkpoint(false).await?;
}
Detailed Behavior
The trigger follows a three-branch decision tree:
| Condition | Action | Rationale |
|---|---|---|
checkpoint_state.is_some() |
Call finish_checkpoint_if_done() |
A checkpoint is already in-flight; check if it has completed |
| Elapsed time > interval AND no cleanup task | Call self.checkpoint(false) |
Time to start a new checkpoint; no concurrent checkpoint or cleanup |
| Otherwise | No action | Either the interval has not elapsed or a cleanup task is in progress |
Condition Breakdown
self.model.checkpoint_state.is_some(): Guards against concurrent checkpoints. If aCheckpointStateexists, a checkpoint is already in progress. The controller callsfinish_checkpoint_if_done()to check whether all operators have completed their checkpoint for this epoch.self.model.last_checkpoint.elapsed() > self.config.checkpoint_interval: Timer-based trigger.last_checkpointis anInstantrecorded when the previous checkpoint completed;checkpoint_intervalis the configuredDuration.self.cleanup_task.is_none(): Ensures no old checkpoint cleanup is running. Cleanup removes expired checkpoint data from object storage, and initiating a new checkpoint during cleanup could cause conflicts.
The checkpoint(false) Call
The boolean parameter false indicates this is a regular checkpoint (not a "then_stop" checkpoint used during graceful shutdown). The checkpoint() method:
- Creates a new
CheckpointStateto track operator completion - Assigns a new epoch number
- Sends
CheckpointBarriermessages to all source operators - Records the checkpoint start time
I/O
- Input: Timer elapsed since last checkpoint compared against configured interval (
self.config.checkpoint_interval) - Output: Initiates checkpoint by creating
CheckpointStateand sendingCheckpointBarrierto all source operators
Implements
Principle:ArroyoSystems_Arroyo_Checkpoint_Triggering Heuristic:ArroyoSystems_Arroyo_Checkpoint_Interval_Tuning