Implementation:ArroyoSystems Arroyo Join Planner
Appearance
Overview
Join Planner implements the JoinRewriter, a DataFusion TreeNodeRewriter that transforms standard SQL join nodes into Arroyo-compatible join extensions. It supports windowed joins, instant (non-windowed inner) joins, and lookup joins against external data sources.
Description
The module defines:
JoinRewriter: The main rewriter struct that processesLogicalPlan::Joinnodes in thef_uppass. It performs:- Lookup join detection: Checks if the right side contains a
LookupSourceand routes tomaybe_plan_lookup_join() - Window validation: Ensures both sides have matching window types (or both are unwindowed for inner joins)
- Key calculation: Creates
KeyCalculationExtensionprojections for both left and right join inputs - Timestamp projection: Adds a post-join projection that computes the maximum of left and right timestamps using a CASE expression with COALESCE for null handling
- Join extension creation: Wraps the rewritten join in a
JoinExtensionwith is_instant flag and optional TTL for updating joins
- Lookup join detection: Checks if the right side contains a
maybe_plan_lookup_join: Handles joins where the right side is a lookup table. Validates that it is an inner or left join, that conditions are equijoin on primary keys, and creates aLookupJoinextension node.
FindLookupExtension: ATreeNodeVisitorthat traverses the right side of a join to findLookupSourcenodes, extracting the connector table, optional filter, and alias.
Key constraints enforced:
- Non-inner joins require windows on both sides
- Mixed windowing between sides is not supported
- Session windows in joins are not supported
- Updating (non-windowed) joins require equijoin conditions
- Lookup sources must be on the right side
Usage
The JoinRewriter is invoked by ArroyoRewriter when a LogicalPlan::Join node is encountered during the f_up traversal.
Code Reference
Source Location
crates/arroyo-planner/src/plan/join.rs
Signature
pub(crate) struct JoinRewriter<'a> {
pub schema_provider: &'a ArroyoSchemaProvider,
}
impl JoinRewriter<'_> {
fn check_join_windowing(join: &Join) -> Result<bool>
fn check_updating(left: &LogicalPlan, right: &LogicalPlan) -> Result<()>
fn create_join_key_plan(
input: Arc<LogicalPlan>,
join_expressions: Vec<Expr>,
name: &'static str,
) -> Result<LogicalPlan>
fn post_join_timestamp_projection(&mut self, input: LogicalPlan) -> Result<LogicalPlan>
}
impl TreeNodeRewriter for JoinRewriter<'_> {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}
fn maybe_plan_lookup_join(join: &Join) -> Result<Option<LogicalPlan>>
Import
use crate::plan::join::JoinRewriter;
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| join | LogicalPlan::Join |
DataFusion join node with left, right, on conditions, and join type |
| schema_provider | &ArroyoSchemaProvider |
Provider with planning options (TTL for updating joins) |
Outputs
| Name | Type | Description |
|---|---|---|
| JoinExtension | LogicalPlan::Extension |
Rewritten join wrapped in Arroyo's JoinExtension with windowing and TTL |
| LookupJoin | LogicalPlan::Extension |
Lookup join extension for external data source joins |
Usage Examples
-- Windowed inner join (rewritten by JoinRewriter)
SELECT a.*, b.*
FROM stream_a a
JOIN stream_b b ON a.key = b.key
WHERE a.window = b.window;
-- Lookup join against an external table
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id;
Related Pages
- ArroyoSystems_Arroyo_Plan_Rewriter - The ArroyoRewriter that delegates to JoinRewriter
- ArroyoSystems_Arroyo_Planner_Extensions - JoinExtension node in the extension registry
- ArroyoSystems_Arroyo_Table_Catalog - ConnectorTable with primary key information for lookup joins
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment