Implementation:ArroyoSystems Arroyo Global Keyed Map
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Streaming, State, Checkpointing |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements the GlobalKeyedTable, a state table that stores arbitrary key-value pairs as bincode-encoded binary data in Parquet files, with support for checkpointing, compaction, and state version migration.
Description
This module provides a fully replicated key-value state table used for global operator state (e.g., lookup tables, session stores):
- GlobalKeyedTable -- the core table implementation, storing state as Parquet files with a two-column schema (key: Binary, value: Binary). Implements the Table trait from the state tables framework.
- GlobalKeyedView<K, V> -- the in-memory view of the state for a single subtask, providing get/insert/remove operations on a HashMap<K, V>. State writes are buffered and flushed during checkpointing.
- Checkpointing -- the GlobalKeyedTableCheckpointer writes modified state to Parquet files on checkpoint, carrying forward file references from previous checkpoints. Supports compaction by merging multiple Parquet files.
- State Migration -- supports versioned state through the MigratableState trait, allowing deserialization of previous state versions and migration to the current version.
- Storage format -- keys and values are bincode-encoded and stored as Binary columns in Parquet with Zstd compression. The schema is defined in a static GLOBAL_KEY_VALUE_SCHEMA.
Key operations:
- load_with_version -- loads state from Parquet files, deserializing all key-value pairs.
- checkpoint -- writes current state delta to a new Parquet file and produces checkpoint metadata.
- compact_data -- merges multiple checkpoint files into a single compacted file.
Usage
Use GlobalKeyedTable through the TableManager in operator contexts for storing global keyed state that needs to survive checkpoints and restarts.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-state/src/tables/global_keyed_map.rs
Signature
pub struct GlobalKeyedTable {
table_name: String,
pub task_info: Arc<TaskInfo>,
storage_provider: StorageProviderRef,
pub files: Vec<String>,
pub state_version: u32,
}
pub struct GlobalKeyedView<K: Key, V: Data> {
table_name: String,
data: HashMap<K, V>,
state_tx: Sender<StateMessage>,
state_version: u32,
}
impl<K: Key, V: Data> GlobalKeyedView<K, V> {
pub fn get(&self, key: &K) -> Option<&V>;
pub fn insert(&mut self, key: K, value: V);
pub fn remove(&mut self, key: &K) -> Option<V>;
}
Import
use arroyo_state::tables::global_keyed_map::{GlobalKeyedTable, GlobalKeyedView};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | GlobalKeyedTableConfig | Yes | Table name, description, two-phase commit flag |
| task_info | Arc<TaskInfo> | Yes | Job, operator, and subtask identity |
| storage_provider | StorageProviderRef | Yes | Object store for reading/writing Parquet files |
| checkpoint_message | Option<TableCheckpointMessage> | No | Previous checkpoint metadata for state restoration |
Outputs
| Name | Type | Description |
|---|---|---|
| GlobalKeyedView<K, V> | GlobalKeyedView | In-memory key-value store for the operator |
| TableSubtaskCheckpointMetadata | protobuf | Checkpoint metadata referencing Parquet files |
Usage Examples
// In an operator's on_start method
let mut view: GlobalKeyedView<String, u64> = ctx
.table_manager
.get_global_keyed_state("my_state")
.await?;
// Read state
if let Some(count) = view.get(&"total".to_string()) {
println!("Current count: {}", count);
}
// Write state (will be persisted on next checkpoint)
view.insert("total".to_string(), 42);
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment