Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo State Core

From Leeroopedia
Revision as of 14:28, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_State_Core.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Streaming, State, Checkpointing
Last Updated 2026-02-08 08:00 GMT

Overview

The root module of the arroyo-state crate, defining the BackingStore trait, core state message types, table configuration helpers, data operation enums, and the Parquet-based state backend alias.

Description

This module serves as the entry point for the Arroyo state management system:

  • StateMessage -- the enum sent from operators to the state backend: Checkpoint (with epoch/watermark), Compaction (metadata maps), and TableData.
  • CheckpointMessage -- carries epoch number, timestamp, watermark, and stop flag.
  • TableData -- the three types of data that can be stored: RecordBatch (columnar), CommitData (opaque bytes), and KeyedData (binary key-value).
  • DataOperation -- an enum describing state mutations: Insert, DeleteTimeKey, DeleteKey, DeleteValue, DeleteTimeRange.
  • BackingStore trait -- the async trait that all state backends must implement, providing prepare_checkpoint_load, load_checkpoint_metadata, load_operator_metadata, write_operator_checkpoint_metadata, write_checkpoint_metadata, and cleanup_checkpoint.
  • StateBackend type alias -- currently aliases to ParquetBackend.
  • Configuration helpers -- global_table_config, global_table_config_with_version, and timestamp_table_config create HashMap<String, TableConfig> for registering tables with operators.
  • hash_key -- hashes a Key using DefaultHasher for key-range partitioning.
  • get_storage_provider -- lazily initializes a singleton StorageProvider from the configured checkpoint URL.

Usage

Use the table configuration helpers when defining operator state requirements. The BackingStore trait is implemented by ParquetBackend and used by the controller and workers for checkpoint persistence.

Code Reference

Source Location

Signature

pub type StateBackend = parquet::ParquetBackend;

pub enum StateMessage {
    Checkpoint(CheckpointMessage),
    Compaction(HashMap<String, TableCheckpointMetadata>),
    TableData { table: String, data: TableData },
}

pub enum TableData {
    RecordBatch(RecordBatch),
    CommitData { data: Vec<u8> },
    KeyedData { key: Vec<u8>, value: Vec<u8> },
}

pub enum DataOperation {
    Insert,
    DeleteTimeKey(DeleteTimeKeyOperation),
    DeleteKey(DeleteKeyOperation),
    DeleteValue(DeleteValueOperation),
    DeleteTimeRange(DeleteTimeRangeOperation),
}

#[async_trait]
pub trait BackingStore {
    async fn prepare_checkpoint_load(metadata: &CheckpointMetadata) -> Result<(), StateError>;
    async fn load_checkpoint_metadata(job_id: &str, epoch: u32) -> Result<CheckpointMetadata, StateError>;
    async fn load_operator_metadata(job_id: &str, operator_id: &str, epoch: u32) -> Result<Option<OperatorCheckpointMetadata>, StateError>;
    async fn write_operator_checkpoint_metadata(metadata: OperatorCheckpointMetadata) -> Result<(), StateError>;
    async fn write_checkpoint_metadata(metadata: CheckpointMetadata) -> Result<(), StateError>;
    async fn cleanup_checkpoint(metadata: CheckpointMetadata, old_min_epoch: u32, new_min_epoch: u32) -> Result<(), StateError>;
    fn name() -> &'static str;
}

pub fn global_table_config(name: impl Into<String>, description: impl Into<String>) -> HashMap<String, TableConfig>;
pub fn timestamp_table_config(name: impl Into<String>, description: impl Into<String>, retention: Duration, generational: bool, schema: ArroyoSchema) -> TableConfig;
pub fn hash_key<K: Hash>(key: &K) -> u64;

Import

use arroyo_state::{
    StateBackend, BackingStore, StateMessage, TableData, DataOperation,
    global_table_config, timestamp_table_config, hash_key,
};

I/O Contract

Inputs

Name Type Required Description
name impl Into<String> Yes Table name for configuration
description impl Into<String> Yes Human-readable table description
retention Duration Yes (for timestamp tables) Data retention window

Outputs

Name Type Description
HashMap<String, TableConfig> HashMap Table configuration ready for operator registration
TableConfig TableConfig Single table configuration for timestamp tables

Usage Examples

use arroyo_state::{global_table_config, timestamp_table_config};
use std::time::Duration;

// Create a global keyed table config
let tables = global_table_config("counters", "Per-key event counters");

// Create a timestamp-keyed table config with 1-hour retention
let ts_table = timestamp_table_config(
    "events",
    "Time-windowed events",
    Duration::from_secs(3600),
    false,
    schema,
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment