Implementation:ArroyoSystems Arroyo Table Manager Checkpoint
Table Manager Checkpoint Implementation
Implementation of TableManager::checkpoint and ParquetBackend::write_operator_checkpoint_metadata.
These two components work together to serialize operator state to Parquet files and persist the metadata describing those files to object storage.
Code References
- File:
crates/arroyo-state/src/tables/table_manager.rs(L350-L368) - File:
crates/arroyo-state/src/parquet.rs(L69-L91)
Core Interfaces
TableManager::checkpoint
impl TableManager {
pub async fn checkpoint(
&mut self,
epoch: u32,
min_epoch: u32,
watermark: Option<SystemTime>,
then_stop: bool,
) -> anyhow::Result<()>
}
ParquetBackend::write_operator_checkpoint_metadata
impl BackingStore for ParquetBackend {
async fn write_operator_checkpoint_metadata(
metadata: OperatorCheckpointMetadata,
) -> Result<(), StateError>
}
Detailed Behavior
TableManager::checkpoint
This method orchestrates the serialization of all state tables owned by a single operator subtask.
| Parameter | Type | Description |
|---|---|---|
epoch |
u32 |
The checkpoint epoch number |
min_epoch |
u32 |
Minimum epoch to retain (older data can be garbage-collected) |
watermark |
Option<SystemTime> |
Current event-time watermark for the operator |
then_stop |
bool |
Whether the operator should stop after this checkpoint |
Execution steps:
- Iterate state tables: For each registered state table (time-keyed maps, global keyed maps, etc.), serialize the in-memory data to Parquet format.
- Write Parquet files: Upload the serialized Parquet files to object storage at paths like
{checkpoint_url}/{job_id}/checkpoints/{epoch}/. - Collect metadata: Gather file references, byte counts, and table schemas into an
OperatorCheckpointMetadatastruct. - Write metadata: Call
ParquetBackend::write_operator_checkpoint_metadata()to persist the metadata. - Clean expired state: Remove in-memory state for epochs older than
min_epoch.
ParquetBackend::write_operator_checkpoint_metadata
This method persists the per-operator checkpoint metadata to object storage.
Storage path format:
{checkpoint_url}/{job_id}/checkpoints/{epoch}/operator-{operator_id}-{subtask_index}.metadata
Execution steps:
- Serialize metadata: Encode the
OperatorCheckpointMetadatastruct to Protobuf binary format. - Write to storage: Upload the serialized metadata to the computed path in object storage.
- Verify write: Ensure the write completed successfully before returning.
Data Flow
The relationship between the two methods:
| Stage | Component | Action |
|---|---|---|
| 1 | TableManager::checkpoint |
Serializes each state table to Parquet files |
| 2 | TableManager::checkpoint |
Uploads Parquet files to object storage |
| 3 | TableManager::checkpoint |
Constructs OperatorCheckpointMetadata
|
| 4 | ParquetBackend |
Writes Protobuf-encoded metadata to object storage |
| 5 | Controller | Receives notification that the subtask checkpoint is complete |
I/O
TableManager::checkpoint
- Input:
epoch,min_epoch,watermark,then_stop - Output: Writes Parquet files for each state table to object storage, then writes
OperatorCheckpointMetadata
ParquetBackend::write_operator_checkpoint_metadata
- Input:
OperatorCheckpointMetadata - Output: Protobuf-encoded metadata written to
{checkpoint_url}/{job_id}/checkpoints/{epoch}/operator-{operator_id}-{subtask_index}.metadata
Implements
Principle:ArroyoSystems_Arroyo_State_Serialization Environment:ArroyoSystems_Arroyo_Object_Storage