Implementation:ArroyoSystems Arroyo State Core
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Streaming, State, Checkpointing |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
The root module of the arroyo-state crate, defining the BackingStore trait, core state message types, table configuration helpers, data operation enums, and the Parquet-based state backend alias.
Description
This module serves as the entry point for the Arroyo state management system:
- StateMessage -- the enum sent from operators to the state backend: Checkpoint (with epoch/watermark), Compaction (metadata maps), and TableData.
- CheckpointMessage -- carries epoch number, timestamp, watermark, and stop flag.
- TableData -- the three types of data that can be stored: RecordBatch (columnar), CommitData (opaque bytes), and KeyedData (binary key-value).
- DataOperation -- an enum describing state mutations: Insert, DeleteTimeKey, DeleteKey, DeleteValue, DeleteTimeRange.
- BackingStore trait -- the async trait that all state backends must implement, providing prepare_checkpoint_load, load_checkpoint_metadata, load_operator_metadata, write_operator_checkpoint_metadata, write_checkpoint_metadata, and cleanup_checkpoint.
- StateBackend type alias -- currently aliases to ParquetBackend.
- Configuration helpers -- global_table_config, global_table_config_with_version, and timestamp_table_config create HashMap<String, TableConfig> for registering tables with operators.
- hash_key -- hashes a Key using DefaultHasher for key-range partitioning.
- get_storage_provider -- lazily initializes a singleton StorageProvider from the configured checkpoint URL.
Usage
Use the table configuration helpers when defining operator state requirements. The BackingStore trait is implemented by ParquetBackend and used by the controller and workers for checkpoint persistence.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-state/src/lib.rs
Signature
pub type StateBackend = parquet::ParquetBackend;
pub enum StateMessage {
Checkpoint(CheckpointMessage),
Compaction(HashMap<String, TableCheckpointMetadata>),
TableData { table: String, data: TableData },
}
pub enum TableData {
RecordBatch(RecordBatch),
CommitData { data: Vec<u8> },
KeyedData { key: Vec<u8>, value: Vec<u8> },
}
pub enum DataOperation {
Insert,
DeleteTimeKey(DeleteTimeKeyOperation),
DeleteKey(DeleteKeyOperation),
DeleteValue(DeleteValueOperation),
DeleteTimeRange(DeleteTimeRangeOperation),
}
#[async_trait]
pub trait BackingStore {
async fn prepare_checkpoint_load(metadata: &CheckpointMetadata) -> Result<(), StateError>;
async fn load_checkpoint_metadata(job_id: &str, epoch: u32) -> Result<CheckpointMetadata, StateError>;
async fn load_operator_metadata(job_id: &str, operator_id: &str, epoch: u32) -> Result<Option<OperatorCheckpointMetadata>, StateError>;
async fn write_operator_checkpoint_metadata(metadata: OperatorCheckpointMetadata) -> Result<(), StateError>;
async fn write_checkpoint_metadata(metadata: CheckpointMetadata) -> Result<(), StateError>;
async fn cleanup_checkpoint(metadata: CheckpointMetadata, old_min_epoch: u32, new_min_epoch: u32) -> Result<(), StateError>;
fn name() -> &'static str;
}
pub fn global_table_config(name: impl Into<String>, description: impl Into<String>) -> HashMap<String, TableConfig>;
pub fn timestamp_table_config(name: impl Into<String>, description: impl Into<String>, retention: Duration, generational: bool, schema: ArroyoSchema) -> TableConfig;
pub fn hash_key<K: Hash>(key: &K) -> u64;
Import
use arroyo_state::{
StateBackend, BackingStore, StateMessage, TableData, DataOperation,
global_table_config, timestamp_table_config, hash_key,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | impl Into<String> | Yes | Table name for configuration |
| description | impl Into<String> | Yes | Human-readable table description |
| retention | Duration | Yes (for timestamp tables) | Data retention window |
Outputs
| Name | Type | Description |
|---|---|---|
| HashMap<String, TableConfig> | HashMap | Table configuration ready for operator registration |
| TableConfig | TableConfig | Single table configuration for timestamp tables |
Usage Examples
use arroyo_state::{global_table_config, timestamp_table_config};
use std::time::Duration;
// Create a global keyed table config
let tables = global_table_config("counters", "Per-key event counters");
// Create a timestamp-keyed table config with 1-hour retention
let ts_table = timestamp_table_config(
"events",
"Time-windowed events",
Duration::from_secs(3600),
false,
schema,
);
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment