Implementation:ArroyoSystems Arroyo State Tables
| Knowledge Sources | |
|---|---|
| Domains | Streaming, State, Checkpointing |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Defines the core Table, ErasedTable, TableEpochCheckpointer, and ErasedCheckpointer traits that form the state table abstraction layer for the Arroyo checkpoint system.
Description
This module establishes the trait hierarchy for all stateful tables in the Arroyo streaming engine:
- MigratableState -- a trait for state structs that support versioned migration from previous data formats. Requires a VERSION constant and a migrate function.
- CheckpointParquetMetadata -- stores a state_version in Parquet file key-value metadata, with bidirectional conversion to/from Parquet KeyValue pairs.
- Table trait -- the core trait for state tables with associated types for Checkpointer, ConfigMessage, TableCheckpointMessage, and TableSubtaskCheckpointMetadata. Defines the full lifecycle: from_config, epoch_checkpointer, merge_checkpoint_metadata, subtask_metadata_from_table, apply_compacted_checkpoint, compact_data, and files_to_keep.
- ErasedTable trait -- a type-erased version of Table that operates on generic protobuf TableConfig and TableCheckpointMetadata, enabling dynamic dispatch. Includes checked_proto_decode for safe protobuf deserialization with table type validation.
- TableEpochCheckpointer -- handles per-epoch checkpoint operations: insert_data and finish. The finish method returns checkpoint metadata and a size estimate.
- ErasedCheckpointer -- type-erased version of TableEpochCheckpointer for dynamic dispatch.
- CompactionConfig -- configuration for compaction operations including storage provider, generation set, and minimum compaction epochs.
- DataTuple / BlindDataTuple -- data containers for keyed time-series data, used internally during checkpointing.
A blanket implementation of ErasedTable for all types implementing Table handles the protobuf encoding/decoding bridge.
Usage
Implement the Table trait to create new state table types. Use ErasedTable through the TableManager for runtime polymorphic table operations.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-state/src/tables/mod.rs
Signature
pub trait MigratableState: Data {
const VERSION: u32;
type PreviousVersion: Data;
fn migrate(previous: Self::PreviousVersion) -> Result<Self, StateError>;
}
#[async_trait]
pub(crate) trait Table: Send + Sync + 'static + Clone {
type Checkpointer: TableEpochCheckpointer;
type ConfigMessage: prost::Message + Default;
type TableCheckpointMessage: prost::Message + Default;
type TableSubtaskCheckpointMetadata: prost::Message + Default;
fn from_config(config: Self::ConfigMessage, task_info: Arc<TaskInfo>,
storage_provider: StorageProviderRef,
checkpoint_message: Option<Self::TableCheckpointMessage>,
state_version: u32) -> Result<Self, StateError>;
fn epoch_checkpointer(&self, epoch: u32,
previous_metadata: Option<Self::TableSubtaskCheckpointMetadata>)
-> Result<Self::Checkpointer, StateError>;
fn merge_checkpoint_metadata(config: Self::ConfigMessage,
subtask_metadata: HashMap<u32, Self::TableSubtaskCheckpointMetadata>)
-> Result<Option<Self::TableCheckpointMessage>, StateError>;
fn table_type() -> TableEnum;
}
#[async_trait]
pub trait TableEpochCheckpointer: Send {
type SubTableCheckpointMessage: prost::Message;
async fn insert_data(&mut self, data: TableData) -> Result<(), StateError>;
async fn finish(self, checkpoint: &CheckpointMessage)
-> Result<Option<(Self::SubTableCheckpointMessage, usize)>, StateError>;
}
Import
use arroyo_state::tables::{
MigratableState, CompactionConfig, DataTuple, BlindDataTuple,
CheckpointParquetMetadata, table_checkpoint_path,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | TableConfig | Yes | Protobuf-encoded table configuration |
| task_info | Arc<TaskInfo> | Yes | Subtask identity for storage path computation |
| storage_provider | StorageProviderRef | Yes | Object store backend |
| checkpoint_message | Option<TableCheckpointMetadata> | No | Previous checkpoint for restoration |
Outputs
| Name | Type | Description |
|---|---|---|
| TableSubtaskCheckpointMetadata | protobuf | Per-subtask checkpoint metadata with files and watermarks |
| TableCheckpointMetadata | protobuf | Merged per-table checkpoint metadata |
Usage Examples
use arroyo_state::tables::MigratableState;
use arroyo_rpc::errors::StateError;
use bincode::{Decode, Encode};
#[derive(Debug, Clone, Encode, Decode)]
struct MyStateV2 {
count: u64,
label: String,
}
#[derive(Debug, Clone, Encode, Decode)]
struct MyStateV1 {
count: u64,
}
impl MigratableState for MyStateV2 {
const VERSION: u32 = 2;
type PreviousVersion = MyStateV1;
fn migrate(previous: MyStateV1) -> Result<Self, StateError> {
Ok(MyStateV2 {
count: previous.count,
label: "default".to_string(),
})
}
}