Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Lookup Join

From Leeroopedia


Knowledge Sources
Domains Streaming, Join_Operations, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

LookupJoin is a streaming operator that enriches each incoming record by performing a point lookup against an external data source (via a LookupConnector), with optional in-memory caching using mini_moka.

Description

The LookupJoin operator implements the ArrowOperator trait to perform lookup joins where the left (streaming) side is joined against an external data source accessed through a LookupConnector. For each incoming batch, the operator:

  1. Evaluates key_exprs (DataFusion PhysicalExpr) to extract lookup keys from the batch
  2. Converts keys to rows via a RowConverter and deduplicates them into a HashMap
  3. Checks the optional Cache<OwnedRow, OwnedRow> (a mini_moka concurrent cache with configurable max capacity and TTL) for cached results
  4. For uncached keys, calls connector.lookup(&cols) to fetch matching rows from the external source
  5. Caches the results and constructs the output by combining left-side non-key columns with the right-side lookup results

The operator supports two join types via LookupJoinType:

  • Left -- All input rows are emitted; missing lookups produce NULLs
  • Inner -- Only rows with non-null lookup results are emitted (filtered via a BooleanArray mask)

The cache is configured with a byte-weighted capacity (max_capacity_bytes, default 8MB) and optional TTL. Metadata fields from the connector configuration are excluded from the null-check filter for inner joins.

Usage

Used when a SQL query joins a streaming table against a lookup source (e.g., a database table or REST API). Constructed via LookupJoinConstructor from an api::LookupJoinOperator configuration.

Code Reference

Source Location

Signature

pub struct LookupJoin {
    connector: Box<dyn LookupConnector + Send>,
    key_exprs: Vec<Arc<dyn PhysicalExpr>>,
    cache: Option<Cache<OwnedRow, OwnedRow>>,
    key_row_converter: RowConverter,
    result_row_converter: RowConverter,
    join_type: LookupJoinType,
    lookup_schema: Arc<Schema>,
    metadata_fields: Vec<MetadataField>,
}

pub(crate) enum LookupJoinType {
    Left,
    Inner,
}

pub struct LookupJoinConstructor;
impl OperatorConstructor for LookupJoinConstructor {
    type ConfigT = api::LookupJoinOperator;
    fn with_config(
        &self,
        config: Self::ConfigT,
        registry: Arc<Registry>,
    ) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_worker::arrow::lookup_join::{LookupJoin, LookupJoinConstructor};

I/O Contract

Inputs

Name Type Required Description
batch RecordBatch Yes Input record batch from the streaming side with key columns for lookup

Outputs

Name Type Description
enriched_batch RecordBatch Input columns (minus keys and timestamp) joined with lookup results, plus the original timestamp column

Usage Examples

// Lookup join is created from a SQL query like:
// SELECT s.*, d.name FROM stream s
// LEFT JOIN dimension_table d ON s.user_id = d.id

let constructor = LookupJoinConstructor;
let operator = constructor.with_config(lookup_join_config, registry)?;

// The operator will call connector.lookup() for each batch of unique keys
// and cache results according to the configured TTL and capacity.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment