Implementation:ArroyoSystems Arroyo Sql Rewriters
Overview
SQL Rewriters is a collection of DataFusion TreeNodeRewriter and TreeNodeVisitor implementations that handle source table rewrites, UNNEST operations, async UDF processing, row-time expression rewriting, time window null check removal, and sink input preparation.
Description
The module defines several rewriters:
SourceRewriter: TransformsTableScannodes for Arroyo connector tables into extension nodes with watermark nodes and projection. It:- Creates a
TableSourceExtensionfor the connector - Optionally wraps with
DebeziumUnrollingExtensionfor CDC sources - Adds a
WatermarkNodewith the configured watermark expression - Projects columns including event time fields, virtual fields, and metadata fields
- Creates a
UnnestRewriter: Handles SQL UNNEST operations by transformingUnnestlogical plan nodes. It preserves schema integrity and adds proper timestamp handling for unnested results.
AsyncUdfRewriter: Detects async UDF calls in projection expressions and wraps them inAsyncUDFExtensionnodes with the appropriate ordering mode and timeout configuration.
RowTimeRewriter: Rewrites references to the special_timestampcolumn (row time) in expressions, handling the distinction between watermark-based and event-time-based timestamps.
TimeWindowNullCheckRemover: Removes IS NOT NULL predicates on window function columns in filters, as window functions can never produce null values and these predicates can cause planning issues.
SinkInputRewriter: Prepares the input plan for sink nodes by handling updating/non-updating stream requirements.
SourceMetadataVisitor: A visitor that collects connection IDs from all source and sink extensions in a plan for metadata tracking.
TimeWindowUdfChecker: A visitor that checks if an expression references time-window UDF calls.
Usage
These rewriters are invoked by ArroyoRewriter and during sink/source processing in the planner. They form the preprocessing pipeline that transforms standard SQL plans into streaming-compatible plans.
Code Reference
Source Location
crates/arroyo-planner/src/rewriters.rs
Signature
pub(crate) struct SourceRewriter<'a> {
pub schema_provider: &'a ArroyoSchemaProvider,
}
impl TreeNodeRewriter for SourceRewriter<'_> {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}
pub(crate) struct UnnestRewriter;
impl TreeNodeRewriter for UnnestRewriter {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}
pub(crate) struct AsyncUdfRewriter<'a> { /* ... */ }
pub(crate) struct RowTimeRewriter;
impl TreeNodeRewriter for RowTimeRewriter {
type Node = Expr;
fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>>
}
pub(crate) struct TimeWindowNullCheckRemover;
pub(crate) struct SinkInputRewriter;
pub(crate) struct SourceMetadataVisitor { /* ... */ }
Import
use crate::rewriters::{
SourceRewriter, UnnestRewriter, AsyncUdfRewriter,
RowTimeRewriter, TimeWindowNullCheckRemover, SinkInputRewriter,
SourceMetadataVisitor,
};
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| node | LogicalPlan |
Logical plan node to rewrite (TableScan, Unnest, Projection, Filter) |
| schema_provider | &ArroyoSchemaProvider |
Schema provider with connector and UDF definitions |
Outputs
| Name | Type | Description |
|---|---|---|
| transformed | Transformed<LogicalPlan> |
Rewritten plan node with Arroyo extensions |
| connection_ids | HashSet |
Collected source/sink connection IDs (from SourceMetadataVisitor) |
Usage Examples
// Rewriting a table scan into a source with watermark
let mut source_rewriter = SourceRewriter {
schema_provider: &provider,
};
let result = source_rewriter.f_up(table_scan_plan)?;
// Rewriting row-time expressions
let rewritten = expr.rewrite(&mut RowTimeRewriter {})?;
Related Pages
- ArroyoSystems_Arroyo_Plan_Rewriter - ArroyoRewriter that invokes these rewriters
- ArroyoSystems_Arroyo_Table_Catalog - ConnectorTable definitions used by SourceRewriter
- ArroyoSystems_Arroyo_Debezium_Extension - DebeziumUnrollingExtension created by SourceRewriter