Implementation:ArroyoSystems Arroyo Instant Join
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Join_Operations, Arrow_Processing |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
InstantJoin is a streaming join operator that matches records from two input streams at the same event-time instant, executing a DataFusion physical join plan per timestamp bucket.
Description
The InstantJoin operator implements the ArrowOperator trait to perform streaming joins where left and right records are matched based on identical event timestamps. Incoming record batches from both sides are partitioned by their timestamp values, and for each distinct timestamp a dedicated DataFusion ExecutionPlan instance is created and executed. Results are materialized and emitted when a watermark advances past the timestamp bucket. State is persisted to expiring time-keyed tables ("left" and "right") for checkpoint recovery, and on startup the operator replays any buffered batches from state.
Usage
Used internally by the Arroyo query planner when a SQL query contains a join between two streams without a window specification or TTL, meaning records must be matched at the exact same event time. The operator is constructed via InstantJoinConstructor from an api::JoinOperator configuration containing a serialized DataFusion join plan and left/right schemas.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-worker/src/arrow/instant_join.rs
Signature
pub struct InstantJoin {
left_input_schema: ArroyoSchemaRef,
right_input_schema: ArroyoSchemaRef,
execs: BTreeMap<SystemTime, InstantComputeHolder>,
futures: Arc<Mutex<FuturesUnordered<NextBatchFuture<SystemTime>>>>,
left_receiver: Arc<RwLock<Option<UnboundedReceiver<RecordBatch>>>>,
right_receiver: Arc<RwLock<Option<UnboundedReceiver<RecordBatch>>>>,
join_exec: Arc<dyn ExecutionPlan>,
}
pub struct InstantJoinConstructor;
impl OperatorConstructor for InstantJoinConstructor {
type ConfigT = api::JoinOperator;
fn with_config(
&self,
config: Self::ConfigT,
registry: Arc<Registry>,
) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_worker::arrow::instant_join::{InstantJoin, InstantJoinConstructor};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| left_batch | RecordBatch | Yes | Record batch from the left input stream, routed via process_batch_index (index 0) |
| right_batch | RecordBatch | Yes | Record batch from the right input stream, routed via process_batch_index (index 1) |
| watermark | Watermark | Yes | Event-time watermark that triggers draining of completed timestamp buckets |
| checkpoint | CheckpointBarrier | Yes | Triggers flush of left and right state tables |
Outputs
| Name | Type | Description |
|---|---|---|
| joined_batch | RecordBatch | Result of executing the DataFusion join plan on matching left/right records at each timestamp |
| watermark | Watermark::EventTime | Forwarded watermark after draining all completed timestamp buckets |
Usage Examples
// Construction via OperatorConstructor (used internally by the engine)
let constructor = InstantJoinConstructor;
let operator = constructor.with_config(join_operator_config, registry)?;
// The operator is then wired into the dataflow graph by the engine.
// process_batch_index routes left (index 0) and right (index 1) inputs.
// On watermark, completed join results are collected and emitted.