Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Global Keyed Map

From Leeroopedia


Knowledge Sources
Domains Streaming, State, Checkpointing
Last Updated 2026-02-08 08:00 GMT

Overview

Implements the GlobalKeyedTable, a state table that stores arbitrary key-value pairs as bincode-encoded binary data in Parquet files, with support for checkpointing, compaction, and state version migration.

Description

This module provides a fully replicated key-value state table used for global operator state (e.g., lookup tables, session stores):

  • GlobalKeyedTable -- the core table implementation, storing state as Parquet files with a two-column schema (key: Binary, value: Binary). Implements the Table trait from the state tables framework.
  • GlobalKeyedView<K, V> -- the in-memory view of the state for a single subtask, providing get/insert/remove operations on a HashMap<K, V>. State writes are buffered and flushed during checkpointing.
  • Checkpointing -- the GlobalKeyedTableCheckpointer writes modified state to Parquet files on checkpoint, carrying forward file references from previous checkpoints. Supports compaction by merging multiple Parquet files.
  • State Migration -- supports versioned state through the MigratableState trait, allowing deserialization of previous state versions and migration to the current version.
  • Storage format -- keys and values are bincode-encoded and stored as Binary columns in Parquet with Zstd compression. The schema is defined in a static GLOBAL_KEY_VALUE_SCHEMA.

Key operations:

  • load_with_version -- loads state from Parquet files, deserializing all key-value pairs.
  • checkpoint -- writes current state delta to a new Parquet file and produces checkpoint metadata.
  • compact_data -- merges multiple checkpoint files into a single compacted file.

Usage

Use GlobalKeyedTable through the TableManager in operator contexts for storing global keyed state that needs to survive checkpoints and restarts.

Code Reference

Source Location

Signature

pub struct GlobalKeyedTable {
    table_name: String,
    pub task_info: Arc<TaskInfo>,
    storage_provider: StorageProviderRef,
    pub files: Vec<String>,
    pub state_version: u32,
}

pub struct GlobalKeyedView<K: Key, V: Data> {
    table_name: String,
    data: HashMap<K, V>,
    state_tx: Sender<StateMessage>,
    state_version: u32,
}

impl<K: Key, V: Data> GlobalKeyedView<K, V> {
    pub fn get(&self, key: &K) -> Option<&V>;
    pub fn insert(&mut self, key: K, value: V);
    pub fn remove(&mut self, key: &K) -> Option<V>;
}

Import

use arroyo_state::tables::global_keyed_map::{GlobalKeyedTable, GlobalKeyedView};

I/O Contract

Inputs

Name Type Required Description
config GlobalKeyedTableConfig Yes Table name, description, two-phase commit flag
task_info Arc<TaskInfo> Yes Job, operator, and subtask identity
storage_provider StorageProviderRef Yes Object store for reading/writing Parquet files
checkpoint_message Option<TableCheckpointMessage> No Previous checkpoint metadata for state restoration

Outputs

Name Type Description
GlobalKeyedView<K, V> GlobalKeyedView In-memory key-value store for the operator
TableSubtaskCheckpointMetadata protobuf Checkpoint metadata referencing Parquet files

Usage Examples

// In an operator's on_start method
let mut view: GlobalKeyedView<String, u64> = ctx
    .table_manager
    .get_global_keyed_state("my_state")
    .await?;

// Read state
if let Some(count) = view.get(&"total".to_string()) {
    println!("Current count: {}", count);
}

// Write state (will be persisted on next checkpoint)
view.insert("total".to_string(), 42);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment