Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Updating Cache

From Leeroopedia
Revision as of 14:29, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_Updating_Cache.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Streaming, Caching, Data_Structures
Last Updated 2026-02-08 08:00 GMT

Overview

UpdatingCache is a TTL-based, generation-aware in-memory cache backed by a manually managed doubly-linked list for efficient LRU-style time-to-idle eviction.

Description

The UpdatingCache<T> is a generic data structure that combines a HashMap for O(1) key lookups with a manually implemented doubly-linked list (using raw NonNull pointers) for maintaining insertion/update ordering. Each entry carries a generation counter that prevents stale writes from overwriting newer data -- an insert with a lower generation than the existing entry is silently ignored.

The eviction policy is time-to-idle: entries are evicted when the elapsed time since their last access exceeds the configured ttl (time-to-live). The linked list maintains entries in update-time order (oldest at the head), enabling efficient iteration through expired entries via TTLIter.

Key operations:

  • insert -- Adds or replaces an entry, respecting generation ordering. Appends the new node to the tail of the eviction list.
  • modify_and_update -- Mutates an existing entry's data and moves its node to the tail (resetting its idle timer).
  • modify -- Mutates data and increments generation without updating the node's position in the eviction list.
  • time_out -- Returns an iterator yielding entries whose idle time has exceeded the TTL, removing them from both the map and the linked list.
  • remove -- Explicitly removes an entry by key.

The linked list is managed via raw pointer operations in unsafe blocks. CacheNodePtr wraps Option<NonNull<CacheNode>> and implements Send and Sync for cross-thread compatibility. The Drop implementation ensures all allocated nodes are freed.

Usage

Used internally by Arroyo operators (such as the lookup join) that need a fast in-memory cache with automatic expiration of stale entries and protection against out-of-order updates via generation tracking.

Code Reference

Source Location

Signature

pub struct UpdatingCache<T: Send + Sync> {
    data: HashMap<Key, CacheEntry<T>>,
    eviction_list_head: CacheNodePtr,
    eviction_list_tail: CacheNodePtr,
    ttl: Duration,
}

#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct Key(pub Arc<Vec<u8>>);

impl<T: Send + Sync> UpdatingCache<T> {
    pub fn with_time_to_idle(ttl: Duration) -> Self;
    pub fn insert(&mut self, key: Arc<Vec<u8>>, now: Instant, generation: u64, value: T);
    pub fn time_out(&mut self, now: Instant) -> impl Iterator<Item = (Arc<Vec<u8>>, T)> + '_;
    pub fn modify_and_update<E, F: Fn(&mut T) -> Result<(), E>>(
        &mut self, key: &[u8], now: Instant, f: F,
    ) -> Option<Result<(), E>>;
    pub fn modify<E, F: Fn(&mut T) -> Result<(), E>>(
        &mut self, key: &[u8], f: F,
    ) -> Option<Result<(), E>>;
    pub fn contains_key(&self, k: &[u8]) -> bool;
    pub fn get_mut(&mut self, key: &[u8]) -> Option<&mut T>;
    pub fn get_mut_generation(&mut self, key: &[u8]) -> Option<(&mut T, u64)>;
    pub fn get_mut_key_value(&mut self, key: &[u8]) -> Option<(Key, &mut T)>;
    pub fn remove(&mut self, key: &[u8]) -> Option<T>;
    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&Key, &mut T)>;
}

Import

use arroyo_worker::arrow::updating_cache::{UpdatingCache, Key};

I/O Contract

Inputs

Name Type Required Description
key Arc<Vec<u8>> Yes Byte-array key identifying the cache entry
now Instant Yes Current wall-clock time used for TTL tracking
generation u64 Yes Monotonically increasing version; stale generations are rejected on insert
value T Yes The data to cache

Outputs

Name Type Description
evicted_entries Iterator<Item = (Arc<Vec<u8>>, T)> Entries whose idle time has exceeded the TTL, yielded by time_out()
value_ref &mut T Mutable reference to cached data, returned by get_mut() and modify()

Usage Examples

use std::time::{Duration, Instant};
use std::sync::Arc;

let mut cache = UpdatingCache::with_time_to_idle(Duration::from_secs(60));

let key = Arc::new(vec![1, 2, 3]);
let now = Instant::now();

// Insert with generation 1
cache.insert(key.clone(), now, 1, 42);

// Modify and update idle timer
cache.modify_and_update(&key, Instant::now(), |v| {
    *v += 1;
    Ok::<(), ()>(())
});

// Evict entries older than TTL
for (evicted_key, evicted_value) in cache.time_out(Instant::now()) {
    println!("Evicted: {:?} -> {}", evicted_key, evicted_value);
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment