Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Arrow Operators

From Leeroopedia


Knowledge Sources
Domains Streaming, Arrow_Processing, DataFusion
Last Updated 2026-02-08 08:00 GMT

Overview

The arrow operators module defines the core stateless Arrow-based execution operators (value execution, projection, key execution) and provides shared infrastructure including StatelessPhysicalExecutor and decode_aggregate used across the Arroyo worker's operator implementations.

Description

This module serves as the root of the arroyo_worker::arrow namespace. It declares public sub-modules for all windowing, join, and UDF operators, and directly implements three stateless operators:

  • ValueExecutionOperator -- Executes a serialized DataFusion physical plan against each incoming batch, producing zero or more output batches.
  • ProjectionOperator -- Evaluates a list of PhysicalExpr expressions against each input batch to produce a projected output with a new schema.
  • KeyExecutionOperator -- Similar to ValueExecutionOperator but additionally tracks key fields for downstream partitioning.

The shared StatelessPhysicalExecutor wraps a DataFusion ExecutionPlan behind an RwLock<Option<RecordBatch>> so that each input batch can be injected and the plan re-executed without recompilation. The decode_aggregate utility function deserializes protobuf-encoded aggregate expressions into DataFusion AggregateFunctionExpr objects.

Usage

These operators are instantiated by the Arroyo engine when compiling SQL queries into physical dataflow graphs. ValueExecutionOperator handles general SQL expressions (filters, maps), ProjectionOperator handles SELECT projections, and KeyExecutionOperator handles repartitioning by key. StatelessPhysicalExecutor is also used by windowing operators for partial aggregation.

Code Reference

Source Location

Signature

pub struct ValueExecutionOperator {
    name: String,
    executor: StatelessPhysicalExecutor,
}

pub struct ProjectionOperator {
    name: String,
    output_schema: ArroyoSchema,
    exprs: Vec<Arc<dyn PhysicalExpr>>,
}

pub struct KeyExecutionOperator {
    name: String,
    executor: StatelessPhysicalExecutor,
    key_fields: Vec<usize>,
}

pub struct StatelessPhysicalExecutor {
    batch: Arc<RwLock<Option<RecordBatch>>>,
    plan: Arc<dyn ExecutionPlan>,
    task_context: Arc<TaskContext>,
}

impl StatelessPhysicalExecutor {
    pub fn new(proto: &[u8], registry: &Registry) -> anyhow::Result<Self>;
    pub async fn process_batch(&mut self, batch: RecordBatch) -> SendableRecordBatchStream;
    pub async fn process_single(&mut self, batch: RecordBatch) -> RecordBatch;
}

pub fn decode_aggregate(
    schema: &SchemaRef,
    name: &str,
    expr: &PhysicalExprNode,
    registry: &dyn FunctionRegistry,
) -> DFResult<Arc<AggregateFunctionExpr>>;

Import

use arroyo_worker::arrow::{
    ValueExecutionOperator, ValueExecutionConstructor,
    ProjectionOperator, ProjectionConstructor,
    KeyExecutionOperator, KeyExecutionConstructor,
    StatelessPhysicalExecutor, decode_aggregate,
};

I/O Contract

Inputs

Name Type Required Description
record_batch RecordBatch Yes Input Arrow record batch to be processed by the operator

Outputs

Name Type Description
output_batch RecordBatch Transformed record batch after applying the execution plan or projection expressions

Usage Examples

// ValueExecutionConstructor builds an operator from a serialized plan
let constructor = ValueExecutionConstructor;
let operator = constructor.with_config(value_plan_config, registry)?;

// ProjectionConstructor builds a projection operator from expressions
let constructor = ProjectionConstructor;
let operator = constructor.with_config(projection_config, registry)?;

// StatelessPhysicalExecutor can be used directly
let mut executor = StatelessPhysicalExecutor::new(&serialized_plan, &registry)?;
let result = executor.process_single(input_batch).await;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment