Implementation:ArroyoSystems Arroyo Lookup Join
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Join_Operations, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
LookupJoin is a streaming operator that enriches each incoming record by performing a point lookup against an external data source (via a LookupConnector), with optional in-memory caching using mini_moka.
Description
The LookupJoin operator implements the ArrowOperator trait to perform lookup joins where the left (streaming) side is joined against an external data source accessed through a LookupConnector. For each incoming batch, the operator:
- Evaluates key_exprs (DataFusion PhysicalExpr) to extract lookup keys from the batch
- Converts keys to rows via a RowConverter and deduplicates them into a HashMap
- Checks the optional Cache<OwnedRow, OwnedRow> (a mini_moka concurrent cache with configurable max capacity and TTL) for cached results
- For uncached keys, calls connector.lookup(&cols) to fetch matching rows from the external source
- Caches the results and constructs the output by combining left-side non-key columns with the right-side lookup results
The operator supports two join types via LookupJoinType:
- Left -- All input rows are emitted; missing lookups produce NULLs
- Inner -- Only rows with non-null lookup results are emitted (filtered via a BooleanArray mask)
The cache is configured with a byte-weighted capacity (max_capacity_bytes, default 8MB) and optional TTL. Metadata fields from the connector configuration are excluded from the null-check filter for inner joins.
Usage
Used when a SQL query joins a streaming table against a lookup source (e.g., a database table or REST API). Constructed via LookupJoinConstructor from an api::LookupJoinOperator configuration.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-worker/src/arrow/lookup_join.rs
Signature
pub struct LookupJoin {
connector: Box<dyn LookupConnector + Send>,
key_exprs: Vec<Arc<dyn PhysicalExpr>>,
cache: Option<Cache<OwnedRow, OwnedRow>>,
key_row_converter: RowConverter,
result_row_converter: RowConverter,
join_type: LookupJoinType,
lookup_schema: Arc<Schema>,
metadata_fields: Vec<MetadataField>,
}
pub(crate) enum LookupJoinType {
Left,
Inner,
}
pub struct LookupJoinConstructor;
impl OperatorConstructor for LookupJoinConstructor {
type ConfigT = api::LookupJoinOperator;
fn with_config(
&self,
config: Self::ConfigT,
registry: Arc<Registry>,
) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_worker::arrow::lookup_join::{LookupJoin, LookupJoinConstructor};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batch | RecordBatch | Yes | Input record batch from the streaming side with key columns for lookup |
Outputs
| Name | Type | Description |
|---|---|---|
| enriched_batch | RecordBatch | Input columns (minus keys and timestamp) joined with lookup results, plus the original timestamp column |
Usage Examples
// Lookup join is created from a SQL query like:
// SELECT s.*, d.name FROM stream s
// LEFT JOIN dimension_table d ON s.user_id = d.id
let constructor = LookupJoinConstructor;
let operator = constructor.with_config(lookup_join_config, registry)?;
// The operator will call connector.lookup() for each batch of unique keys
// and cache results according to the configured TTL and capacity.