Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Handle Checkpoint

From Leeroopedia


Template:Implementation

Handle Checkpoint Implementation

Implementation of the ArrowOperator::handle_checkpoint trait method.

This method is the per-operator hook invoked after barrier alignment. It allows each operator to perform custom logic (e.g., flushing buffers, finalizing aggregations) before the framework persists state tables to durable storage.

Code Reference

  • File: crates/arroyo-operator/src/operator.rs (L1216-L1223)

Core Interface

The handle_checkpoint Method

#[async_trait::async_trait]
pub trait ArrowOperator: Send + 'static {
    // ... other methods ...

    async fn handle_checkpoint(&mut self, _b: CheckpointBarrier, _ctx: &mut ArrowContext) -> bool {
        // Default implementation: return true to proceed with checkpoint
        true
    }
}

The CheckpointBarrier Struct

pub struct CheckpointBarrier {
    pub epoch: u32,
    pub min_epoch: u32,
    pub timestamp: SystemTime,
    pub then_stop: bool,
}

Detailed Behavior

Method Signature

Parameter Type Description
self &mut Self Mutable reference to the operator's state
_b CheckpointBarrier The barrier message containing epoch metadata
_ctx &mut ArrowContext Execution context providing access to state tables, metrics, and output channels
Return bool true to proceed with checkpoint; false to skip

CheckpointBarrier Fields

Field Type Description
epoch u32 The checkpoint epoch number for this barrier
min_epoch u32 The minimum epoch that must be retained (older epochs can be garbage-collected)
timestamp SystemTime The wall-clock time when the checkpoint was initiated
then_stop bool If true, the operator should stop after checkpointing (used for graceful shutdown)

Invocation Sequence

The framework invokes handle_checkpoint as part of the following sequence:

  1. Barrier alignment: The framework waits for barriers on all input channels.
  2. Operator callback: handle_checkpoint(barrier, ctx) is called, giving the operator a chance to flush pending state.
  3. State persistence: If the method returns true, the framework calls ctx.table_manager.checkpoint() to serialize all state tables.
  4. Barrier forwarding: The barrier is forwarded to downstream operators.

Default Implementation

The default implementation returns true, meaning most operators do not need custom checkpoint logic. Operators that maintain internal buffers or pending computations (e.g., windowed aggregations, async I/O) override this method to flush their state before the checkpoint is taken.

I/O

  • Input: CheckpointBarrier message received after alignment from all input channels
  • Output: bool -- true to proceed with checkpoint (triggers ctx.table_manager.checkpoint()), false to skip

Implements

Principle:ArroyoSystems_Arroyo_Barrier_Propagation

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment