Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:ArroyoSystems Arroyo UDF Runtime Execution

From Leeroopedia


Template:Principle

Summary

This principle covers executing UDFs at runtime within the Arroyo streaming dataflow. UDF execution splits into two distinct patterns: sync execution for pure computational UDFs and async execution for I/O-bound UDFs. Both patterns integrate with the columnar Arrow-based processing pipeline and must handle checkpoint coordination for exactly-once semantics.

Core Concept

At runtime, UDFs are invoked as part of the streaming dataflow graph. The execution model differs based on whether the UDF is synchronous or asynchronous:

  • Sync UDFs: Called per-batch via DataFusion's ScalarUDF interface. They operate on entire Arrow arrays and return Arrow arrays, fitting naturally into the columnar execution model.
  • Async UDFs: Run as separate operator tasks with their own concurrency management. They process individual rows asynchronously with configurable concurrency, ordering, and timeout parameters.

Theoretical Basis

Zero-Copy FFI

UDF execution requires passing data across the dynamic library boundary. The system uses the Arrow C Data Interface to achieve zero-copy FFI: Arrow arrays are passed as raw pointers with associated metadata, avoiding serialization/deserialization overhead. This is critical for maintaining the performance benefits of columnar processing.

Batch Processing

Sync UDFs operate on entire columns rather than individual rows. This batch-oriented execution model provides several advantages:

  • Reduced FFI overhead: One FFI call per batch instead of one per row
  • CPU cache efficiency: Processing contiguous memory in column-major order
  • Vectorization opportunities: Columnar operations can leverage SIMD instructions

Async Execution Patterns

Async UDFs introduce additional complexity due to their non-deterministic completion order:

Parameter Purpose Impact
Concurrency (allowed_in_flight) Maximum number of concurrent async invocations Controls resource usage and backpressure
Ordering (ordered) Whether results must be emitted in input order Preserves or relaxes ordering guarantees
Timeout (timeout) Maximum time for a single async invocation Prevents indefinite blocking on failed I/O

When ordered mode is enabled, the system buffers out-of-order completions and re-orders them before emitting. This preserves the input ordering guarantee at the cost of increased latency and memory usage.

Checkpoint Integration

Async UDFs must integrate with the checkpoint mechanism for exactly-once processing. During a checkpoint:

  • In-flight async invocations are tracked by their invocation IDs
  • The checkpoint records which invocations are pending
  • On recovery, pending invocations are re-submitted to ensure no results are lost

Execution Flow

Sync UDF Execution

  • Input: A batch of Arrow arrays (one per UDF parameter) from the upstream operator
  • Step 1: Convert Arrow arrays to FFI representation via the C Data Interface
  • Step 2: Call the UDF's __run entry point
  • Step 3: The UDF iterates over the arrays, applies the user function per row, and collects results
  • Step 4: Return the result as an Arrow array to DataFusion
  • Output: A single Arrow array (or columnar value) representing the UDF result for the entire batch

Async UDF Execution

  • Input: Individual rows extracted from upstream batches
  • Step 1: Call start() to initialize the async runtime with ordering, timeout, and concurrency parameters
  • Step 2: For each row, call send(id, data) to submit the row for async processing
  • Step 3: Periodically call drain_results() to collect completed results
  • Step 4: Match results back to their input rows via invocation IDs
  • Output: Pairs of (invocation_ids, result_data) that the operator assembles into output batches

Design Considerations

  • Error isolation: UDF panics inside the dynamic library should be caught at the FFI boundary to prevent crashing the worker process
  • Memory management: Arrow arrays passed across the FFI boundary must follow proper ownership semantics to prevent leaks or use-after-free
  • Python GIL: Python UDFs are subject to the Global Interpreter Lock, limiting true parallelism for CPU-bound Python UDFs (async I/O-bound UDFs are less affected)
  • Timeout enforcement: Async UDF timeouts are enforced by the operator, not the UDF itself, ensuring that misbehaving UDFs cannot block the pipeline indefinitely

Related Implementation

Implementation:ArroyoSystems_Arroyo_UDF_Invocation Heuristic:ArroyoSystems_Arroyo_Async_UDF_Concurrency

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment