Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Table Manager Checkpoint

From Leeroopedia


Template:Implementation

Table Manager Checkpoint Implementation

Implementation of TableManager::checkpoint and ParquetBackend::write_operator_checkpoint_metadata.

These two components work together to serialize operator state to Parquet files and persist the metadata describing those files to object storage.

Code References

  • File: crates/arroyo-state/src/tables/table_manager.rs (L350-L368)
  • File: crates/arroyo-state/src/parquet.rs (L69-L91)

Core Interfaces

TableManager::checkpoint

impl TableManager {
    pub async fn checkpoint(
        &mut self,
        epoch: u32,
        min_epoch: u32,
        watermark: Option<SystemTime>,
        then_stop: bool,
    ) -> anyhow::Result<()>
}

ParquetBackend::write_operator_checkpoint_metadata

impl BackingStore for ParquetBackend {
    async fn write_operator_checkpoint_metadata(
        metadata: OperatorCheckpointMetadata,
    ) -> Result<(), StateError>
}

Detailed Behavior

TableManager::checkpoint

This method orchestrates the serialization of all state tables owned by a single operator subtask.

Parameter Type Description
epoch u32 The checkpoint epoch number
min_epoch u32 Minimum epoch to retain (older data can be garbage-collected)
watermark Option<SystemTime> Current event-time watermark for the operator
then_stop bool Whether the operator should stop after this checkpoint

Execution steps:

  1. Iterate state tables: For each registered state table (time-keyed maps, global keyed maps, etc.), serialize the in-memory data to Parquet format.
  2. Write Parquet files: Upload the serialized Parquet files to object storage at paths like {checkpoint_url}/{job_id}/checkpoints/{epoch}/.
  3. Collect metadata: Gather file references, byte counts, and table schemas into an OperatorCheckpointMetadata struct.
  4. Write metadata: Call ParquetBackend::write_operator_checkpoint_metadata() to persist the metadata.
  5. Clean expired state: Remove in-memory state for epochs older than min_epoch.

ParquetBackend::write_operator_checkpoint_metadata

This method persists the per-operator checkpoint metadata to object storage.

Storage path format:

{checkpoint_url}/{job_id}/checkpoints/{epoch}/operator-{operator_id}-{subtask_index}.metadata

Execution steps:

  1. Serialize metadata: Encode the OperatorCheckpointMetadata struct to Protobuf binary format.
  2. Write to storage: Upload the serialized metadata to the computed path in object storage.
  3. Verify write: Ensure the write completed successfully before returning.

Data Flow

The relationship between the two methods:

Stage Component Action
1 TableManager::checkpoint Serializes each state table to Parquet files
2 TableManager::checkpoint Uploads Parquet files to object storage
3 TableManager::checkpoint Constructs OperatorCheckpointMetadata
4 ParquetBackend Writes Protobuf-encoded metadata to object storage
5 Controller Receives notification that the subtask checkpoint is complete

I/O

TableManager::checkpoint

  • Input: epoch, min_epoch, watermark, then_stop
  • Output: Writes Parquet files for each state table to object storage, then writes OperatorCheckpointMetadata

ParquetBackend::write_operator_checkpoint_metadata

  • Input: OperatorCheckpointMetadata
  • Output: Protobuf-encoded metadata written to {checkpoint_url}/{job_id}/checkpoints/{epoch}/operator-{operator_id}-{subtask_index}.metadata

Implements

Principle:ArroyoSystems_Arroyo_State_Serialization Environment:ArroyoSystems_Arroyo_Object_Storage

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment