Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Physical Planner

From Leeroopedia


Overview

Physical Planner defines Arroyo-specific DataFusion physical execution plan nodes and codec infrastructure used during physical planning and plan serialization. It includes custom execution plans for windowing, Debezium processing, memory execution, unnesting, and physical extension codec for protobuf serialization.

Description

The module provides:

  • WindowFunctionUdf: A scalar UDF that creates window struct values from timestamp pairs (start, end). Returns a struct with start and end timestamp fields.
  • ArroyoPhysicalExtensionCodec: Implements DataFusion's PhysicalExtensionCodec for serializing/deserializing Arroyo-specific physical plan nodes to/from protobuf. Handles:
    • ArroyoMemExec: Memory-based execution with configurable schemas
    • DebeziumUnrollingExec: Physical execution for Debezium CDC decoding
    • ToDebeziumExec: Physical execution for Debezium CDC encoding
    • UnnestExec: Physical execution for SQL UNNEST operations
  • ArroyoMemExec: A custom execution plan node that serves as a placeholder for streaming data sources. It provides a schema and optional batches, and is replaced at runtime with actual streaming data.
  • DebeziumUnrollingExec: Physical plan for decoding Debezium change events. Processes before/after/op fields to produce insert, update, and delete records with an _updating metadata field.
  • ToDebeziumExec: Physical plan for encoding records into Debezium format with before, after, and op fields.
  • DecodingContext: An enum for managing data source context (unbounded receiver, memory stream, or empty) during plan execution.

Usage

These types are used during the physical planning phase and at runtime within worker nodes to execute the streaming SQL dataflow.

Code Reference

Source Location

crates/arroyo-planner/src/physical.rs

Signature

#[derive(Debug)]
pub struct WindowFunctionUdf {
    signature: Signature,
}

impl ScalarUDFImpl for WindowFunctionUdf {
    fn name(&self) -> &str  // returns "window"
    fn return_type(&self, _: &[DataType]) -> Result<DataType>  // returns window struct
    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue>
}

#[derive(Debug)]
pub struct ArroyoPhysicalExtensionCodec { /* ... */ }

impl PhysicalExtensionCodec for ArroyoPhysicalExtensionCodec {
    fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> Result<Arc<dyn ExecutionPlan>>
    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>
}

#[derive(Debug)]
pub struct ArroyoMemExec { /* ... */ }

pub enum DecodingContext {
    UnboundedBatchReceiver(UnboundedReceiver<RecordBatch>),
    LockedBatchVec(Arc<RwLock<Vec<RecordBatch>>>),
    None,
}

Import

use crate::physical::{
    ArroyoPhysicalExtensionCodec, ArroyoMemExec, DecodingContext,
    DebeziumUnrollingExec, ToDebeziumExec, WindowFunctionUdf,
};

I/O Contract

Inputs

Name Type Description
buf &[u8] Protobuf-encoded physical plan bytes (for decoding)
inputs &[Arc<dyn ExecutionPlan>] Child execution plans
RecordBatch struct Input record batches for execution

Outputs

Name Type Description
ExecutionPlan Arc<dyn ExecutionPlan> Decoded physical execution plan
RecordBatch struct Output record batches from execution

Usage Examples

// Creating a memory execution plan placeholder
let mem_exec = ArroyoMemExec::new(schema, None);

// Creating a decoding context from a channel receiver
let context = DecodingContext::UnboundedBatchReceiver(rx);

// Using the codec for plan serialization
let codec = ArroyoPhysicalExtensionCodec::default();
let mut buf = Vec::new();
codec.try_encode(plan.clone(), &mut buf)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment