Principle:ArroyoSystems Arroyo UDF Runtime Execution
Summary
This principle covers executing UDFs at runtime within the Arroyo streaming dataflow. UDF execution splits into two distinct patterns: sync execution for pure computational UDFs and async execution for I/O-bound UDFs. Both patterns integrate with the columnar Arrow-based processing pipeline and must handle checkpoint coordination for exactly-once semantics.
Core Concept
At runtime, UDFs are invoked as part of the streaming dataflow graph. The execution model differs based on whether the UDF is synchronous or asynchronous:
- Sync UDFs: Called per-batch via DataFusion's
ScalarUDFinterface. They operate on entire Arrow arrays and return Arrow arrays, fitting naturally into the columnar execution model. - Async UDFs: Run as separate operator tasks with their own concurrency management. They process individual rows asynchronously with configurable concurrency, ordering, and timeout parameters.
Theoretical Basis
Zero-Copy FFI
UDF execution requires passing data across the dynamic library boundary. The system uses the Arrow C Data Interface to achieve zero-copy FFI: Arrow arrays are passed as raw pointers with associated metadata, avoiding serialization/deserialization overhead. This is critical for maintaining the performance benefits of columnar processing.
Batch Processing
Sync UDFs operate on entire columns rather than individual rows. This batch-oriented execution model provides several advantages:
- Reduced FFI overhead: One FFI call per batch instead of one per row
- CPU cache efficiency: Processing contiguous memory in column-major order
- Vectorization opportunities: Columnar operations can leverage SIMD instructions
Async Execution Patterns
Async UDFs introduce additional complexity due to their non-deterministic completion order:
| Parameter | Purpose | Impact |
|---|---|---|
Concurrency (allowed_in_flight) |
Maximum number of concurrent async invocations | Controls resource usage and backpressure |
Ordering (ordered) |
Whether results must be emitted in input order | Preserves or relaxes ordering guarantees |
Timeout (timeout) |
Maximum time for a single async invocation | Prevents indefinite blocking on failed I/O |
When ordered mode is enabled, the system buffers out-of-order completions and re-orders them before emitting. This preserves the input ordering guarantee at the cost of increased latency and memory usage.
Checkpoint Integration
Async UDFs must integrate with the checkpoint mechanism for exactly-once processing. During a checkpoint:
- In-flight async invocations are tracked by their invocation IDs
- The checkpoint records which invocations are pending
- On recovery, pending invocations are re-submitted to ensure no results are lost
Execution Flow
Sync UDF Execution
- Input: A batch of Arrow arrays (one per UDF parameter) from the upstream operator
- Step 1: Convert Arrow arrays to FFI representation via the C Data Interface
- Step 2: Call the UDF's
__runentry point - Step 3: The UDF iterates over the arrays, applies the user function per row, and collects results
- Step 4: Return the result as an Arrow array to DataFusion
- Output: A single Arrow array (or columnar value) representing the UDF result for the entire batch
Async UDF Execution
- Input: Individual rows extracted from upstream batches
- Step 1: Call
start()to initialize the async runtime with ordering, timeout, and concurrency parameters - Step 2: For each row, call
send(id, data)to submit the row for async processing - Step 3: Periodically call
drain_results()to collect completed results - Step 4: Match results back to their input rows via invocation IDs
- Output: Pairs of
(invocation_ids, result_data)that the operator assembles into output batches
Design Considerations
- Error isolation: UDF panics inside the dynamic library should be caught at the FFI boundary to prevent crashing the worker process
- Memory management: Arrow arrays passed across the FFI boundary must follow proper ownership semantics to prevent leaks or use-after-free
- Python GIL: Python UDFs are subject to the Global Interpreter Lock, limiting true parallelism for CPU-bound Python UDFs (async I/O-bound UDFs are less affected)
- Timeout enforcement: Async UDF timeouts are enforced by the operator, not the UDF itself, ensuring that misbehaving UDFs cannot block the pipeline indefinitely
Related Implementation
Implementation:ArroyoSystems_Arroyo_UDF_Invocation Heuristic:ArroyoSystems_Arroyo_Async_UDF_Concurrency