Implementation:ArroyoSystems Arroyo Handle Checkpoint
Handle Checkpoint Implementation
Implementation of the ArrowOperator::handle_checkpoint trait method.
This method is the per-operator hook invoked after barrier alignment. It allows each operator to perform custom logic (e.g., flushing buffers, finalizing aggregations) before the framework persists state tables to durable storage.
Code Reference
- File:
crates/arroyo-operator/src/operator.rs(L1216-L1223)
Core Interface
The handle_checkpoint Method
#[async_trait::async_trait]
pub trait ArrowOperator: Send + 'static {
// ... other methods ...
async fn handle_checkpoint(&mut self, _b: CheckpointBarrier, _ctx: &mut ArrowContext) -> bool {
// Default implementation: return true to proceed with checkpoint
true
}
}
The CheckpointBarrier Struct
pub struct CheckpointBarrier {
pub epoch: u32,
pub min_epoch: u32,
pub timestamp: SystemTime,
pub then_stop: bool,
}
Detailed Behavior
Method Signature
| Parameter | Type | Description |
|---|---|---|
self |
&mut Self |
Mutable reference to the operator's state |
_b |
CheckpointBarrier |
The barrier message containing epoch metadata |
_ctx |
&mut ArrowContext |
Execution context providing access to state tables, metrics, and output channels |
| Return | bool |
true to proceed with checkpoint; false to skip
|
CheckpointBarrier Fields
| Field | Type | Description |
|---|---|---|
epoch |
u32 |
The checkpoint epoch number for this barrier |
min_epoch |
u32 |
The minimum epoch that must be retained (older epochs can be garbage-collected) |
timestamp |
SystemTime |
The wall-clock time when the checkpoint was initiated |
then_stop |
bool |
If true, the operator should stop after checkpointing (used for graceful shutdown)
|
Invocation Sequence
The framework invokes handle_checkpoint as part of the following sequence:
- Barrier alignment: The framework waits for barriers on all input channels.
- Operator callback:
handle_checkpoint(barrier, ctx)is called, giving the operator a chance to flush pending state. - State persistence: If the method returns
true, the framework callsctx.table_manager.checkpoint()to serialize all state tables. - Barrier forwarding: The barrier is forwarded to downstream operators.
Default Implementation
The default implementation returns true, meaning most operators do not need custom checkpoint logic. Operators that maintain internal buffers or pending computations (e.g., windowed aggregations, async I/O) override this method to flush their state before the checkpoint is taken.
I/O
- Input:
CheckpointBarriermessage received after alignment from all input channels - Output:
bool--trueto proceed with checkpoint (triggersctx.table_manager.checkpoint()),falseto skip