Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Join With Expiration

From Leeroopedia


Knowledge Sources
Domains Streaming, Join_Operations, Arrow_Processing
Last Updated 2026-02-08 08:00 GMT

Overview

JoinWithExpiration is a streaming join operator that matches records from two input streams using key-time state tables with configurable TTL-based expiration, executing a DataFusion join plan on each new batch against the accumulated opposite-side state.

Description

The JoinWithExpiration operator implements the ArrowOperator trait to perform streaming joins where both sides accumulate state in key-time tables that automatically expire old data based on a configurable TTL. Unlike InstantJoin which matches on exact timestamps, this operator performs a key-based join where each incoming batch from one side is joined against all accumulated (non-expired) batches from the opposite side that share the same key.

When a batch arrives on the left side:

  1. It is inserted into the "left" key-time table
  2. The matching keys are looked up in the "right" key-time table
  3. The incoming left batch (unkeyed) and concatenated right batches are passed to the DataFusion join_execution_plan

The join plan is executed synchronously via a shared RwLock<Option<RecordBatch>> pair (left_passer and right_passer) using the LockedJoinPair decoding context. The right side follows the symmetric process.

If no TTL is configured, the operator defaults to a 24-hour expiration with a warning.

Usage

Used when a SQL query contains a join between two streams with a TTL or WITHIN clause specifying how long each side's data should be retained. Constructed via JoinWithExpirationConstructor from an api::JoinOperator configuration.

Code Reference

Source Location

Signature

pub struct JoinWithExpiration {
    left_expiration: Duration,
    right_expiration: Duration,
    left_input_schema: ArroyoSchema,
    right_input_schema: ArroyoSchema,
    left_schema: ArroyoSchema,
    right_schema: ArroyoSchema,
    left_passer: Arc<RwLock<Option<RecordBatch>>>,
    right_passer: Arc<RwLock<Option<RecordBatch>>>,
    join_execution_plan: Arc<dyn ExecutionPlan>,
}

pub struct JoinWithExpirationConstructor;
impl OperatorConstructor for JoinWithExpirationConstructor {
    type ConfigT = api::JoinOperator;
    fn with_config(
        &self,
        config: Self::ConfigT,
        registry: Arc<Registry>,
    ) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_worker::arrow::join_with_expiration::{
    JoinWithExpiration, JoinWithExpirationConstructor,
};

I/O Contract

Inputs

Name Type Required Description
left_batch RecordBatch Yes Record batch from the left input stream, routed via process_batch_index (index 0)
right_batch RecordBatch Yes Record batch from the right input stream, routed via process_batch_index (index 1)

Outputs

Name Type Description
joined_batch RecordBatch Result of executing the DataFusion join plan on the incoming batch and its matching opposite-side state

Usage Examples

// Join with expiration is created from a SQL query like:
// SELECT * FROM left_stream JOIN right_stream
//   ON left_stream.key = right_stream.key
//   WHERE left_stream.event_time BETWEEN
//     right_stream.event_time - INTERVAL '1' HOUR
//     AND right_stream.event_time + INTERVAL '1' HOUR

let constructor = JoinWithExpirationConstructor;
let operator = constructor.with_config(join_config, registry)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment