Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo State Tables

From Leeroopedia
Revision as of 14:28, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_State_Tables.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Streaming, State, Checkpointing
Last Updated 2026-02-08 08:00 GMT

Overview

Defines the core Table, ErasedTable, TableEpochCheckpointer, and ErasedCheckpointer traits that form the state table abstraction layer for the Arroyo checkpoint system.

Description

This module establishes the trait hierarchy for all stateful tables in the Arroyo streaming engine:

  • MigratableState -- a trait for state structs that support versioned migration from previous data formats. Requires a VERSION constant and a migrate function.
  • CheckpointParquetMetadata -- stores a state_version in Parquet file key-value metadata, with bidirectional conversion to/from Parquet KeyValue pairs.
  • Table trait -- the core trait for state tables with associated types for Checkpointer, ConfigMessage, TableCheckpointMessage, and TableSubtaskCheckpointMetadata. Defines the full lifecycle: from_config, epoch_checkpointer, merge_checkpoint_metadata, subtask_metadata_from_table, apply_compacted_checkpoint, compact_data, and files_to_keep.
  • ErasedTable trait -- a type-erased version of Table that operates on generic protobuf TableConfig and TableCheckpointMetadata, enabling dynamic dispatch. Includes checked_proto_decode for safe protobuf deserialization with table type validation.
  • TableEpochCheckpointer -- handles per-epoch checkpoint operations: insert_data and finish. The finish method returns checkpoint metadata and a size estimate.
  • ErasedCheckpointer -- type-erased version of TableEpochCheckpointer for dynamic dispatch.
  • CompactionConfig -- configuration for compaction operations including storage provider, generation set, and minimum compaction epochs.
  • DataTuple / BlindDataTuple -- data containers for keyed time-series data, used internally during checkpointing.

A blanket implementation of ErasedTable for all types implementing Table handles the protobuf encoding/decoding bridge.

Usage

Implement the Table trait to create new state table types. Use ErasedTable through the TableManager for runtime polymorphic table operations.

Code Reference

Source Location

Signature

pub trait MigratableState: Data {
    const VERSION: u32;
    type PreviousVersion: Data;
    fn migrate(previous: Self::PreviousVersion) -> Result<Self, StateError>;
}

#[async_trait]
pub(crate) trait Table: Send + Sync + 'static + Clone {
    type Checkpointer: TableEpochCheckpointer;
    type ConfigMessage: prost::Message + Default;
    type TableCheckpointMessage: prost::Message + Default;
    type TableSubtaskCheckpointMetadata: prost::Message + Default;

    fn from_config(config: Self::ConfigMessage, task_info: Arc<TaskInfo>,
                   storage_provider: StorageProviderRef,
                   checkpoint_message: Option<Self::TableCheckpointMessage>,
                   state_version: u32) -> Result<Self, StateError>;
    fn epoch_checkpointer(&self, epoch: u32,
                          previous_metadata: Option<Self::TableSubtaskCheckpointMetadata>)
                          -> Result<Self::Checkpointer, StateError>;
    fn merge_checkpoint_metadata(config: Self::ConfigMessage,
                                 subtask_metadata: HashMap<u32, Self::TableSubtaskCheckpointMetadata>)
                                 -> Result<Option<Self::TableCheckpointMessage>, StateError>;
    fn table_type() -> TableEnum;
}

#[async_trait]
pub trait TableEpochCheckpointer: Send {
    type SubTableCheckpointMessage: prost::Message;
    async fn insert_data(&mut self, data: TableData) -> Result<(), StateError>;
    async fn finish(self, checkpoint: &CheckpointMessage)
                    -> Result<Option<(Self::SubTableCheckpointMessage, usize)>, StateError>;
}

Import

use arroyo_state::tables::{
    MigratableState, CompactionConfig, DataTuple, BlindDataTuple,
    CheckpointParquetMetadata, table_checkpoint_path,
};

I/O Contract

Inputs

Name Type Required Description
config TableConfig Yes Protobuf-encoded table configuration
task_info Arc<TaskInfo> Yes Subtask identity for storage path computation
storage_provider StorageProviderRef Yes Object store backend
checkpoint_message Option<TableCheckpointMetadata> No Previous checkpoint for restoration

Outputs

Name Type Description
TableSubtaskCheckpointMetadata protobuf Per-subtask checkpoint metadata with files and watermarks
TableCheckpointMetadata protobuf Merged per-table checkpoint metadata

Usage Examples

use arroyo_state::tables::MigratableState;
use arroyo_rpc::errors::StateError;
use bincode::{Decode, Encode};

#[derive(Debug, Clone, Encode, Decode)]
struct MyStateV2 {
    count: u64,
    label: String,
}

#[derive(Debug, Clone, Encode, Decode)]
struct MyStateV1 {
    count: u64,
}

impl MigratableState for MyStateV2 {
    const VERSION: u32 = 2;
    type PreviousVersion = MyStateV1;

    fn migrate(previous: MyStateV1) -> Result<Self, StateError> {
        Ok(MyStateV2 {
            count: previous.count,
            label: "default".to_string(),
        })
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment