Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Join Planner

From Leeroopedia


Overview

Join Planner implements the JoinRewriter, a DataFusion TreeNodeRewriter that transforms standard SQL join nodes into Arroyo-compatible join extensions. It supports windowed joins, instant (non-windowed inner) joins, and lookup joins against external data sources.

Description

The module defines:

  • JoinRewriter: The main rewriter struct that processes LogicalPlan::Join nodes in the f_up pass. It performs:
    • Lookup join detection: Checks if the right side contains a LookupSource and routes to maybe_plan_lookup_join()
    • Window validation: Ensures both sides have matching window types (or both are unwindowed for inner joins)
    • Key calculation: Creates KeyCalculationExtension projections for both left and right join inputs
    • Timestamp projection: Adds a post-join projection that computes the maximum of left and right timestamps using a CASE expression with COALESCE for null handling
    • Join extension creation: Wraps the rewritten join in a JoinExtension with is_instant flag and optional TTL for updating joins
  • maybe_plan_lookup_join: Handles joins where the right side is a lookup table. Validates that it is an inner or left join, that conditions are equijoin on primary keys, and creates a LookupJoin extension node.
  • FindLookupExtension: A TreeNodeVisitor that traverses the right side of a join to find LookupSource nodes, extracting the connector table, optional filter, and alias.

Key constraints enforced:

  • Non-inner joins require windows on both sides
  • Mixed windowing between sides is not supported
  • Session windows in joins are not supported
  • Updating (non-windowed) joins require equijoin conditions
  • Lookup sources must be on the right side

Usage

The JoinRewriter is invoked by ArroyoRewriter when a LogicalPlan::Join node is encountered during the f_up traversal.

Code Reference

Source Location

crates/arroyo-planner/src/plan/join.rs

Signature

pub(crate) struct JoinRewriter<'a> {
    pub schema_provider: &'a ArroyoSchemaProvider,
}

impl JoinRewriter<'_> {
    fn check_join_windowing(join: &Join) -> Result<bool>
    fn check_updating(left: &LogicalPlan, right: &LogicalPlan) -> Result<()>
    fn create_join_key_plan(
        input: Arc<LogicalPlan>,
        join_expressions: Vec<Expr>,
        name: &'static str,
    ) -> Result<LogicalPlan>
    fn post_join_timestamp_projection(&mut self, input: LogicalPlan) -> Result<LogicalPlan>
}

impl TreeNodeRewriter for JoinRewriter<'_> {
    type Node = LogicalPlan;
    fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}

fn maybe_plan_lookup_join(join: &Join) -> Result<Option<LogicalPlan>>

Import

use crate::plan::join::JoinRewriter;

I/O Contract

Inputs

Name Type Description
join LogicalPlan::Join DataFusion join node with left, right, on conditions, and join type
schema_provider &ArroyoSchemaProvider Provider with planning options (TTL for updating joins)

Outputs

Name Type Description
JoinExtension LogicalPlan::Extension Rewritten join wrapped in Arroyo's JoinExtension with windowing and TTL
LookupJoin LogicalPlan::Extension Lookup join extension for external data source joins

Usage Examples

-- Windowed inner join (rewritten by JoinRewriter)
SELECT a.*, b.*
FROM stream_a a
JOIN stream_b b ON a.key = b.key
WHERE a.window = b.window;

-- Lookup join against an external table
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment