Implementation:ArroyoSystems Arroyo Plan Rewriter
Appearance
Overview
Plan Rewriter contains the ArroyoRewriter, the top-level DataFusion TreeNodeRewriter that transforms a standard DataFusion logical plan into an Arroyo-compatible streaming logical plan. It also includes the WindowDetectingVisitor for analyzing window types in plan subtrees.
Description
The module defines:
ArroyoRewriter: ImplementsTreeNodeRewriterand dispatches each logical plan node type to the appropriate handler:- Projection: Adds missing
_timestampand_updatingfields, rewrites row-time references viaRowTimeRewriter, and handles async UDFs viaAsyncUdfRewriter. - Aggregate: Delegates to
AggregateRewriterfor windowed/non-windowed aggregate handling. - Join: Delegates to
JoinRewriterfor join rewriting. - TableScan: Delegates to
SourceRewriterfor source table rewrites. - Filter: Removes IS NOT NULL predicates on window fields via
TimeWindowNullCheckRemover. - Window: Delegates to
WindowFunctionRewriter. - Union: Wraps non-transparent inputs in
RemoteTableExtensionfor materialization. - SubqueryAlias: Recreates from children to handle schema changes.
- Unsupported nodes (Sort, Repartition, Limit, Explain, Analyze, Copy, Describe, RecursiveQuery) return appropriate error messages.
- Projection: Adds missing
WindowDetectingVisitor: ATreeNodeVisitorthat traverses a plan subtree to determine the window type. It tracks window-related fields through projections, subquery aliases, and aggregates, and validates that:- Window expressions within a single plan agree on window type
- Windows are only created within aggregate GROUP BY clauses
- Join nodes have consistent windowing on both sides
extract_column: A helper that extracts the underlyingColumnfrom an expression, unwrapping aliases.
Usage
ArroyoRewriter is the main entry point for plan transformation, applied after DataFusion's standard optimization passes.
Code Reference
Source Location
crates/arroyo-planner/src/plan/mod.rs
Signature
pub struct ArroyoRewriter<'a> {
pub(crate) schema_provider: &'a ArroyoSchemaProvider,
}
impl TreeNodeRewriter for ArroyoRewriter<'_> {
type Node = LogicalPlan;
fn f_up(&mut self, mut node: Self::Node) -> Result<Transformed<Self::Node>>
}
#[derive(Debug, Default)]
struct WindowDetectingVisitor {
window: Option<WindowType>,
fields: HashSet<DFField>,
}
impl WindowDetectingVisitor {
fn get_window(logical_plan: &LogicalPlan) -> Result<Option<WindowType>>
}
Import
use crate::plan::ArroyoRewriter;
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| node | LogicalPlan |
Any DataFusion logical plan node to be rewritten |
| schema_provider | &ArroyoSchemaProvider |
Schema provider with table definitions and planning configuration |
Outputs
| Name | Type | Description |
|---|---|---|
| transformed | Transformed<LogicalPlan> |
The rewritten plan node, potentially wrapped in Arroyo extensions |
Usage Examples
let mut rewriter = ArroyoRewriter {
schema_provider: &schema_provider,
};
let rewritten = logical_plan.rewrite(&mut rewriter)?;
Related Pages
- ArroyoSystems_Arroyo_Join_Planner - Join-specific rewriting delegated from ArroyoRewriter
- ArroyoSystems_Arroyo_Sql_Rewriters - Source, UDF, and other rewriters used by ArroyoRewriter
- ArroyoSystems_Arroyo_Plan_Graph_Builder - Consumes the rewritten plan to build the dataflow graph
- ArroyoSystems_Arroyo_Aggregate_Extension - Aggregate extensions created during rewriting
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment