Implementation:ArroyoSystems Arroyo Join With Expiration
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Join_Operations, Arrow_Processing |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
JoinWithExpiration is a streaming join operator that matches records from two input streams using key-time state tables with configurable TTL-based expiration, executing a DataFusion join plan on each new batch against the accumulated opposite-side state.
Description
The JoinWithExpiration operator implements the ArrowOperator trait to perform streaming joins where both sides accumulate state in key-time tables that automatically expire old data based on a configurable TTL. Unlike InstantJoin which matches on exact timestamps, this operator performs a key-based join where each incoming batch from one side is joined against all accumulated (non-expired) batches from the opposite side that share the same key.
When a batch arrives on the left side:
- It is inserted into the "left" key-time table
- The matching keys are looked up in the "right" key-time table
- The incoming left batch (unkeyed) and concatenated right batches are passed to the DataFusion join_execution_plan
The join plan is executed synchronously via a shared RwLock<Option<RecordBatch>> pair (left_passer and right_passer) using the LockedJoinPair decoding context. The right side follows the symmetric process.
If no TTL is configured, the operator defaults to a 24-hour expiration with a warning.
Usage
Used when a SQL query contains a join between two streams with a TTL or WITHIN clause specifying how long each side's data should be retained. Constructed via JoinWithExpirationConstructor from an api::JoinOperator configuration.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-worker/src/arrow/join_with_expiration.rs
Signature
pub struct JoinWithExpiration {
left_expiration: Duration,
right_expiration: Duration,
left_input_schema: ArroyoSchema,
right_input_schema: ArroyoSchema,
left_schema: ArroyoSchema,
right_schema: ArroyoSchema,
left_passer: Arc<RwLock<Option<RecordBatch>>>,
right_passer: Arc<RwLock<Option<RecordBatch>>>,
join_execution_plan: Arc<dyn ExecutionPlan>,
}
pub struct JoinWithExpirationConstructor;
impl OperatorConstructor for JoinWithExpirationConstructor {
type ConfigT = api::JoinOperator;
fn with_config(
&self,
config: Self::ConfigT,
registry: Arc<Registry>,
) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_worker::arrow::join_with_expiration::{
JoinWithExpiration, JoinWithExpirationConstructor,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| left_batch | RecordBatch | Yes | Record batch from the left input stream, routed via process_batch_index (index 0) |
| right_batch | RecordBatch | Yes | Record batch from the right input stream, routed via process_batch_index (index 1) |
Outputs
| Name | Type | Description |
|---|---|---|
| joined_batch | RecordBatch | Result of executing the DataFusion join plan on the incoming batch and its matching opposite-side state |
Usage Examples
// Join with expiration is created from a SQL query like:
// SELECT * FROM left_stream JOIN right_stream
// ON left_stream.key = right_stream.key
// WHERE left_stream.event_time BETWEEN
// right_stream.event_time - INTERVAL '1' HOUR
// AND right_stream.event_time + INTERVAL '1' HOUR
let constructor = JoinWithExpirationConstructor;
let operator = constructor.with_config(join_config, registry)?;