Implementation:ArroyoSystems Arroyo Updating Cache
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Caching, Data_Structures |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
UpdatingCache is a TTL-based, generation-aware in-memory cache backed by a manually managed doubly-linked list for efficient LRU-style time-to-idle eviction.
Description
The UpdatingCache<T> is a generic data structure that combines a HashMap for O(1) key lookups with a manually implemented doubly-linked list (using raw NonNull pointers) for maintaining insertion/update ordering. Each entry carries a generation counter that prevents stale writes from overwriting newer data -- an insert with a lower generation than the existing entry is silently ignored.
The eviction policy is time-to-idle: entries are evicted when the elapsed time since their last access exceeds the configured ttl (time-to-live). The linked list maintains entries in update-time order (oldest at the head), enabling efficient iteration through expired entries via TTLIter.
Key operations:
- insert -- Adds or replaces an entry, respecting generation ordering. Appends the new node to the tail of the eviction list.
- modify_and_update -- Mutates an existing entry's data and moves its node to the tail (resetting its idle timer).
- modify -- Mutates data and increments generation without updating the node's position in the eviction list.
- time_out -- Returns an iterator yielding entries whose idle time has exceeded the TTL, removing them from both the map and the linked list.
- remove -- Explicitly removes an entry by key.
The linked list is managed via raw pointer operations in unsafe blocks. CacheNodePtr wraps Option<NonNull<CacheNode>> and implements Send and Sync for cross-thread compatibility. The Drop implementation ensures all allocated nodes are freed.
Usage
Used internally by Arroyo operators (such as the lookup join) that need a fast in-memory cache with automatic expiration of stale entries and protection against out-of-order updates via generation tracking.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-worker/src/arrow/updating_cache.rs
Signature
pub struct UpdatingCache<T: Send + Sync> {
data: HashMap<Key, CacheEntry<T>>,
eviction_list_head: CacheNodePtr,
eviction_list_tail: CacheNodePtr,
ttl: Duration,
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct Key(pub Arc<Vec<u8>>);
impl<T: Send + Sync> UpdatingCache<T> {
pub fn with_time_to_idle(ttl: Duration) -> Self;
pub fn insert(&mut self, key: Arc<Vec<u8>>, now: Instant, generation: u64, value: T);
pub fn time_out(&mut self, now: Instant) -> impl Iterator<Item = (Arc<Vec<u8>>, T)> + '_;
pub fn modify_and_update<E, F: Fn(&mut T) -> Result<(), E>>(
&mut self, key: &[u8], now: Instant, f: F,
) -> Option<Result<(), E>>;
pub fn modify<E, F: Fn(&mut T) -> Result<(), E>>(
&mut self, key: &[u8], f: F,
) -> Option<Result<(), E>>;
pub fn contains_key(&self, k: &[u8]) -> bool;
pub fn get_mut(&mut self, key: &[u8]) -> Option<&mut T>;
pub fn get_mut_generation(&mut self, key: &[u8]) -> Option<(&mut T, u64)>;
pub fn get_mut_key_value(&mut self, key: &[u8]) -> Option<(Key, &mut T)>;
pub fn remove(&mut self, key: &[u8]) -> Option<T>;
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&Key, &mut T)>;
}
Import
use arroyo_worker::arrow::updating_cache::{UpdatingCache, Key};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| key | Arc<Vec<u8>> | Yes | Byte-array key identifying the cache entry |
| now | Instant | Yes | Current wall-clock time used for TTL tracking |
| generation | u64 | Yes | Monotonically increasing version; stale generations are rejected on insert |
| value | T | Yes | The data to cache |
Outputs
| Name | Type | Description |
|---|---|---|
| evicted_entries | Iterator<Item = (Arc<Vec<u8>>, T)> | Entries whose idle time has exceeded the TTL, yielded by time_out() |
| value_ref | &mut T | Mutable reference to cached data, returned by get_mut() and modify() |
Usage Examples
use std::time::{Duration, Instant};
use std::sync::Arc;
let mut cache = UpdatingCache::with_time_to_idle(Duration::from_secs(60));
let key = Arc::new(vec![1, 2, 3]);
let now = Instant::now();
// Insert with generation 1
cache.insert(key.clone(), now, 1, 42);
// Modify and update idle timer
cache.modify_and_update(&key, Instant::now(), |v| {
*v += 1;
Ok::<(), ()>(())
});
// Evict entries older than TTL
for (evicted_key, evicted_value) in cache.time_out(Instant::now()) {
println!("Evicted: {:?} -> {}", evicted_key, evicted_value);
}