Implementation:ArroyoSystems Arroyo Planner Extensions
Overview
Planner Extensions defines the ArroyoExtension trait and supporting types that form the interface between DataFusion's logical plan framework and Arroyo's streaming dataflow graph construction. It also provides the type dispatch mechanism for converting DataFusion extension nodes to Arroyo extensions.
Description
The module defines:
ArroyoExtensiontrait: The core interface that all Arroyo logical plan extensions implement. Methods include:node_name(): Optional named node for memoization in the graph builderplan_node(): Converts the extension to aNodeWithIncomingEdgesfor the dataflow graphoutput_schema(): Returns the Arroyo output schematransparent(): Whether the node should be treated as transparent for graph construction (default false)
NodeWithIncomingEdges: Pairs aLogicalNodewith its incomingLogicalEdgeconnections.
- Type dispatch via TryFrom: Implements conversion from
&dyn UserDefinedLogicalNodeto&dyn ArroyoExtensionby attempting downcast to each known extension type in sequence: TableSourceExtension, WatermarkNode, SinkExtension, KeyCalculationExtension, AggregateExtension, RemoteTableExtension, JoinExtension, WindowFunctionExtension, AsyncUDFExtension, ToDebeziumExtension, DebeziumUnrollingExtension, UpdatingAggregateExtension, LookupJoin, and ProjectionExtension.
AsyncUDFExtension: An extension for asynchronous user-defined function calls, supporting ordered, unordered, and in-order modes.
TimestampAppendExtensiontrait: A helper trait for extensions that need to append timestamp fields to their output schemas.
The module re-exports all extension submodules: aggregate, debezium, join, key_calculation, lookup, projection, remote_table, sink, table_source, updating_aggregate, watermark_node, and window_fn.
Usage
Every Arroyo-specific logical plan node implements ArroyoExtension. The plan graph builder uses the trait methods to construct the physical dataflow graph.
Code Reference
Source Location
crates/arroyo-planner/src/extension/mod.rs
Signature
pub(crate) trait ArroyoExtension: Debug {
fn node_name(&self) -> Option<NamedNode>;
fn plan_node(
&self,
planner: &Planner,
index: usize,
input_schemas: Vec<ArroyoSchemaRef>,
) -> Result<NodeWithIncomingEdges>;
fn output_schema(&self) -> ArroyoSchema;
fn transparent(&self) -> bool { false }
}
pub(crate) struct NodeWithIncomingEdges {
pub node: LogicalNode,
pub edges: Vec<LogicalEdge>,
}
impl<'a> TryFrom<&'a dyn UserDefinedLogicalNode> for &'a dyn ArroyoExtension {
type Error = DataFusionError;
fn try_from(node: &'a dyn UserDefinedLogicalNode) -> Result<Self, Self::Error>
}
Import
use crate::extension::{ArroyoExtension, NodeWithIncomingEdges};
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| planner | &Planner |
Planner context for physical plan generation |
| index | usize |
Node index in the graph |
| input_schemas | Vec<ArroyoSchemaRef> |
Schemas from upstream nodes |
Outputs
| Name | Type | Description |
|---|---|---|
| NodeWithIncomingEdges | struct | Graph node and its incoming edges |
| ArroyoSchema | struct | Output schema of the extension node |
Usage Examples
// Converting a DataFusion extension node to an Arroyo extension
if let LogicalPlan::Extension(Extension { node }) = &plan {
let arroyo_ext: &dyn ArroyoExtension = node.as_ref().try_into()?;
let output = arroyo_ext.plan_node(&planner, idx, input_schemas)?;
graph.add_node(output.node);
}
Related Pages
- ArroyoSystems_Arroyo_Aggregate_Extension - Aggregate extension implementation
- ArroyoSystems_Arroyo_Debezium_Extension - Debezium extension implementations
- ArroyoSystems_Arroyo_Join_Planner - Join extension created during join rewriting
- ArroyoSystems_Arroyo_Plan_Graph_Builder - Consumes ArroyoExtension nodes to build the graph