Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Plan Rewriter

From Leeroopedia
Revision as of 14:27, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_Plan_Rewriter.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Overview

Plan Rewriter contains the ArroyoRewriter, the top-level DataFusion TreeNodeRewriter that transforms a standard DataFusion logical plan into an Arroyo-compatible streaming logical plan. It also includes the WindowDetectingVisitor for analyzing window types in plan subtrees.

Description

The module defines:

  • ArroyoRewriter: Implements TreeNodeRewriter and dispatches each logical plan node type to the appropriate handler:
    • Projection: Adds missing _timestamp and _updating fields, rewrites row-time references via RowTimeRewriter, and handles async UDFs via AsyncUdfRewriter.
    • Aggregate: Delegates to AggregateRewriter for windowed/non-windowed aggregate handling.
    • Join: Delegates to JoinRewriter for join rewriting.
    • TableScan: Delegates to SourceRewriter for source table rewrites.
    • Filter: Removes IS NOT NULL predicates on window fields via TimeWindowNullCheckRemover.
    • Window: Delegates to WindowFunctionRewriter.
    • Union: Wraps non-transparent inputs in RemoteTableExtension for materialization.
    • SubqueryAlias: Recreates from children to handle schema changes.
    • Unsupported nodes (Sort, Repartition, Limit, Explain, Analyze, Copy, Describe, RecursiveQuery) return appropriate error messages.
  • WindowDetectingVisitor: A TreeNodeVisitor that traverses a plan subtree to determine the window type. It tracks window-related fields through projections, subquery aliases, and aggregates, and validates that:
    • Window expressions within a single plan agree on window type
    • Windows are only created within aggregate GROUP BY clauses
    • Join nodes have consistent windowing on both sides
  • extract_column: A helper that extracts the underlying Column from an expression, unwrapping aliases.

Usage

ArroyoRewriter is the main entry point for plan transformation, applied after DataFusion's standard optimization passes.

Code Reference

Source Location

crates/arroyo-planner/src/plan/mod.rs

Signature

pub struct ArroyoRewriter<'a> {
    pub(crate) schema_provider: &'a ArroyoSchemaProvider,
}

impl TreeNodeRewriter for ArroyoRewriter<'_> {
    type Node = LogicalPlan;
    fn f_up(&mut self, mut node: Self::Node) -> Result<Transformed<Self::Node>>
}

#[derive(Debug, Default)]
struct WindowDetectingVisitor {
    window: Option<WindowType>,
    fields: HashSet<DFField>,
}

impl WindowDetectingVisitor {
    fn get_window(logical_plan: &LogicalPlan) -> Result<Option<WindowType>>
}

Import

use crate::plan::ArroyoRewriter;

I/O Contract

Inputs

Name Type Description
node LogicalPlan Any DataFusion logical plan node to be rewritten
schema_provider &ArroyoSchemaProvider Schema provider with table definitions and planning configuration

Outputs

Name Type Description
transformed Transformed<LogicalPlan> The rewritten plan node, potentially wrapped in Arroyo extensions

Usage Examples

let mut rewriter = ArroyoRewriter {
    schema_provider: &schema_provider,
};

let rewritten = logical_plan.rewrite(&mut rewriter)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment