Implementation:ArroyoSystems Arroyo Physical Planner
Appearance
Overview
Physical Planner defines Arroyo-specific DataFusion physical execution plan nodes and codec infrastructure used during physical planning and plan serialization. It includes custom execution plans for windowing, Debezium processing, memory execution, unnesting, and physical extension codec for protobuf serialization.
Description
The module provides:
WindowFunctionUdf: A scalar UDF that creates window struct values from timestamp pairs (start, end). Returns a struct withstartandendtimestamp fields.
ArroyoPhysicalExtensionCodec: Implements DataFusion'sPhysicalExtensionCodecfor serializing/deserializing Arroyo-specific physical plan nodes to/from protobuf. Handles:ArroyoMemExec: Memory-based execution with configurable schemasDebeziumUnrollingExec: Physical execution for Debezium CDC decodingToDebeziumExec: Physical execution for Debezium CDC encodingUnnestExec: Physical execution for SQL UNNEST operations
ArroyoMemExec: A custom execution plan node that serves as a placeholder for streaming data sources. It provides a schema and optional batches, and is replaced at runtime with actual streaming data.
DebeziumUnrollingExec: Physical plan for decoding Debezium change events. Processes before/after/op fields to produce insert, update, and delete records with an_updatingmetadata field.
ToDebeziumExec: Physical plan for encoding records into Debezium format with before, after, and op fields.
DecodingContext: An enum for managing data source context (unbounded receiver, memory stream, or empty) during plan execution.
Usage
These types are used during the physical planning phase and at runtime within worker nodes to execute the streaming SQL dataflow.
Code Reference
Source Location
crates/arroyo-planner/src/physical.rs
Signature
#[derive(Debug)]
pub struct WindowFunctionUdf {
signature: Signature,
}
impl ScalarUDFImpl for WindowFunctionUdf {
fn name(&self) -> &str // returns "window"
fn return_type(&self, _: &[DataType]) -> Result<DataType> // returns window struct
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue>
}
#[derive(Debug)]
pub struct ArroyoPhysicalExtensionCodec { /* ... */ }
impl PhysicalExtensionCodec for ArroyoPhysicalExtensionCodec {
fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> Result<Arc<dyn ExecutionPlan>>
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>
}
#[derive(Debug)]
pub struct ArroyoMemExec { /* ... */ }
pub enum DecodingContext {
UnboundedBatchReceiver(UnboundedReceiver<RecordBatch>),
LockedBatchVec(Arc<RwLock<Vec<RecordBatch>>>),
None,
}
Import
use crate::physical::{
ArroyoPhysicalExtensionCodec, ArroyoMemExec, DecodingContext,
DebeziumUnrollingExec, ToDebeziumExec, WindowFunctionUdf,
};
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| buf | &[u8] |
Protobuf-encoded physical plan bytes (for decoding) |
| inputs | &[Arc<dyn ExecutionPlan>] |
Child execution plans |
| RecordBatch | struct | Input record batches for execution |
Outputs
| Name | Type | Description |
|---|---|---|
| ExecutionPlan | Arc<dyn ExecutionPlan> |
Decoded physical execution plan |
| RecordBatch | struct | Output record batches from execution |
Usage Examples
// Creating a memory execution plan placeholder
let mem_exec = ArroyoMemExec::new(schema, None);
// Creating a decoding context from a channel receiver
let context = DecodingContext::UnboundedBatchReceiver(rx);
// Using the codec for plan serialization
let codec = ArroyoPhysicalExtensionCodec::default();
let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;
Related Pages
- ArroyoSystems_Arroyo_Plan_Graph_Builder - Uses the physical planner for plan serialization
- ArroyoSystems_Arroyo_Debezium_Extension - Logical plan counterparts of the Debezium physical plans
- ArroyoSystems_Arroyo_Sql_Functions - WindowFunctionUdf and MultiHashFunction UDFs
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment