Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Instant Join

From Leeroopedia


Knowledge Sources
Domains Streaming, Join_Operations, Arrow_Processing
Last Updated 2026-02-08 08:00 GMT

Overview

InstantJoin is a streaming join operator that matches records from two input streams at the same event-time instant, executing a DataFusion physical join plan per timestamp bucket.

Description

The InstantJoin operator implements the ArrowOperator trait to perform streaming joins where left and right records are matched based on identical event timestamps. Incoming record batches from both sides are partitioned by their timestamp values, and for each distinct timestamp a dedicated DataFusion ExecutionPlan instance is created and executed. Results are materialized and emitted when a watermark advances past the timestamp bucket. State is persisted to expiring time-keyed tables ("left" and "right") for checkpoint recovery, and on startup the operator replays any buffered batches from state.

Usage

Used internally by the Arroyo query planner when a SQL query contains a join between two streams without a window specification or TTL, meaning records must be matched at the exact same event time. The operator is constructed via InstantJoinConstructor from an api::JoinOperator configuration containing a serialized DataFusion join plan and left/right schemas.

Code Reference

Source Location

Signature

pub struct InstantJoin {
    left_input_schema: ArroyoSchemaRef,
    right_input_schema: ArroyoSchemaRef,
    execs: BTreeMap<SystemTime, InstantComputeHolder>,
    futures: Arc<Mutex<FuturesUnordered<NextBatchFuture<SystemTime>>>>,
    left_receiver: Arc<RwLock<Option<UnboundedReceiver<RecordBatch>>>>,
    right_receiver: Arc<RwLock<Option<UnboundedReceiver<RecordBatch>>>>,
    join_exec: Arc<dyn ExecutionPlan>,
}

pub struct InstantJoinConstructor;
impl OperatorConstructor for InstantJoinConstructor {
    type ConfigT = api::JoinOperator;
    fn with_config(
        &self,
        config: Self::ConfigT,
        registry: Arc<Registry>,
    ) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_worker::arrow::instant_join::{InstantJoin, InstantJoinConstructor};

I/O Contract

Inputs

Name Type Required Description
left_batch RecordBatch Yes Record batch from the left input stream, routed via process_batch_index (index 0)
right_batch RecordBatch Yes Record batch from the right input stream, routed via process_batch_index (index 1)
watermark Watermark Yes Event-time watermark that triggers draining of completed timestamp buckets
checkpoint CheckpointBarrier Yes Triggers flush of left and right state tables

Outputs

Name Type Description
joined_batch RecordBatch Result of executing the DataFusion join plan on matching left/right records at each timestamp
watermark Watermark::EventTime Forwarded watermark after draining all completed timestamp buckets

Usage Examples

// Construction via OperatorConstructor (used internally by the engine)
let constructor = InstantJoinConstructor;
let operator = constructor.with_config(join_operator_config, registry)?;

// The operator is then wired into the dataflow graph by the engine.
// process_batch_index routes left (index 0) and right (index 1) inputs.
// On watermark, completed join results are collected and emitted.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment