Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Sql Rewriters

From Leeroopedia


Overview

SQL Rewriters is a collection of DataFusion TreeNodeRewriter and TreeNodeVisitor implementations that handle source table rewrites, UNNEST operations, async UDF processing, row-time expression rewriting, time window null check removal, and sink input preparation.

Description

The module defines several rewriters:

  • SourceRewriter: Transforms TableScan nodes for Arroyo connector tables into extension nodes with watermark nodes and projection. It:
    • Creates a TableSourceExtension for the connector
    • Optionally wraps with DebeziumUnrollingExtension for CDC sources
    • Adds a WatermarkNode with the configured watermark expression
    • Projects columns including event time fields, virtual fields, and metadata fields
  • UnnestRewriter: Handles SQL UNNEST operations by transforming Unnest logical plan nodes. It preserves schema integrity and adds proper timestamp handling for unnested results.
  • AsyncUdfRewriter: Detects async UDF calls in projection expressions and wraps them in AsyncUDFExtension nodes with the appropriate ordering mode and timeout configuration.
  • RowTimeRewriter: Rewrites references to the special _timestamp column (row time) in expressions, handling the distinction between watermark-based and event-time-based timestamps.
  • TimeWindowNullCheckRemover: Removes IS NOT NULL predicates on window function columns in filters, as window functions can never produce null values and these predicates can cause planning issues.
  • SinkInputRewriter: Prepares the input plan for sink nodes by handling updating/non-updating stream requirements.
  • SourceMetadataVisitor: A visitor that collects connection IDs from all source and sink extensions in a plan for metadata tracking.
  • TimeWindowUdfChecker: A visitor that checks if an expression references time-window UDF calls.

Usage

These rewriters are invoked by ArroyoRewriter and during sink/source processing in the planner. They form the preprocessing pipeline that transforms standard SQL plans into streaming-compatible plans.

Code Reference

Source Location

crates/arroyo-planner/src/rewriters.rs

Signature

pub(crate) struct SourceRewriter<'a> {
    pub schema_provider: &'a ArroyoSchemaProvider,
}

impl TreeNodeRewriter for SourceRewriter<'_> {
    type Node = LogicalPlan;
    fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}

pub(crate) struct UnnestRewriter;

impl TreeNodeRewriter for UnnestRewriter {
    type Node = LogicalPlan;
    fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}

pub(crate) struct AsyncUdfRewriter<'a> { /* ... */ }

pub(crate) struct RowTimeRewriter;
impl TreeNodeRewriter for RowTimeRewriter {
    type Node = Expr;
    fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}

pub(crate) struct TimeWindowNullCheckRemover;
pub(crate) struct SinkInputRewriter;
pub(crate) struct SourceMetadataVisitor { /* ... */ }

Import

use crate::rewriters::{
    SourceRewriter, UnnestRewriter, AsyncUdfRewriter,
    RowTimeRewriter, TimeWindowNullCheckRemover, SinkInputRewriter,
    SourceMetadataVisitor,
};

I/O Contract

Inputs

Name Type Description
node LogicalPlan Logical plan node to rewrite (TableScan, Unnest, Projection, Filter)
schema_provider &ArroyoSchemaProvider Schema provider with connector and UDF definitions

Outputs

Name Type Description
transformed Transformed<LogicalPlan> Rewritten plan node with Arroyo extensions
connection_ids HashSet Collected source/sink connection IDs (from SourceMetadataVisitor)

Usage Examples

// Rewriting a table scan into a source with watermark
let mut source_rewriter = SourceRewriter {
    schema_provider: &provider,
};
let result = source_rewriter.f_up(table_scan_plan)?;

// Rewriting row-time expressions
let rewritten = expr.rewrite(&mut RowTimeRewriter {})?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment