Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo UDF Invocation

From Leeroopedia


Template:Implementation

Summary

This page documents the runtime UDF invocation implementation in the Arroyo streaming engine, covering SyncUdfDylib::invoke_with_args for synchronous columnar execution and AsyncUdfDylib for asynchronous row-level execution.

Code Reference

Component File Lines
Sync and async UDF host crates/arroyo-udf/arroyo-udf-host/src/lib.rs L176-L427
Async UDF operator crates/arroyo-worker/src/arrow/async_udf.rs L63-L110

SyncUdfDylib::invoke_with_args

Signature

impl SyncUdfDylib {
    pub fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue>
}

I/O

  • Input: ScalarFunctionArgs -- DataFusion's columnar function arguments, containing one or more Arrow arrays corresponding to the UDF's parameters
  • Output: DFResult<ColumnarValue> -- a DataFusion result wrapping either:
    • ColumnarValue::Array -- an Arrow array with one result per input row
    • ColumnarValue::Scalar -- a single scalar value (when all inputs are scalar)

Behavior

The synchronous invocation path integrates with DataFusion's ScalarUDF interface:

  1. Extract arrays: Unwraps the ScalarFunctionArgs to obtain the underlying Arrow arrays for each parameter.
  2. Convert to FFI: Transforms each Arrow array into the FfiArrays representation using the Arrow C Data Interface. This is a zero-copy operation that passes pointers across the dynamic library boundary.
  3. Call __run: Invokes the generated FFI entry point in the loaded dynamic library, passing the FfiArrays.
  4. Process result: The __run function within the dylib:
    • Converts FFI arrays back to typed Rust arrays
    • Iterates over each row, calling the user function
    • Collects results into a new Arrow array
    • Returns the result via RunResult
  5. Return to DataFusion: Wraps the returned Arrow array in a ColumnarValue and returns it to the DataFusion execution engine.

AsyncUdfDylib

Signatures

impl AsyncUdfDylib {
    pub fn start(&self, ordered: bool, timeout: Duration, allowed_in_flight: u32)
    pub fn send(&self, id: u64, data: Vec<ArrayData>) -> anyhow::Result<()>
    pub fn drain_results(&self) -> Option<(UInt64Array, ArrayData)>
}

I/O

  • start:
    • Input: ordered: bool (whether to preserve input order), timeout: Duration (per-invocation timeout), allowed_in_flight: u32 (max concurrent invocations)
    • Output: None (initializes internal async runtime state)
  • send:
    • Input: id: u64 (unique invocation identifier), data: Vec<ArrayData> (row data as Arrow arrays, one per parameter)
    • Output: anyhow::Result<()> (success or error if the runtime is not accepting new work)
  • drain_results:
    • Input: None
    • Output: Option<(UInt64Array, ArrayData)> -- if results are available, returns a pair mapping invocation IDs to their corresponding result values; None if no results are ready

Behavior

The async UDF execution follows a lifecycle-oriented pattern:

Initialization (start)

  • Creates a Tokio async runtime inside the dynamic library
  • Configures the runtime with the specified concurrency limit, ordering mode, and timeout
  • Sets up internal channels for submitting work and collecting results

Row Submission (send)

  • Converts the row's ArrayData values to native types
  • Submits the async invocation to the runtime's task pool, tagged with the invocation id
  • If the concurrency limit is reached, the call may block until a slot becomes available (backpressure)

Result Collection (drain_results)

  • Polls the internal results channel for completed invocations
  • Returns all currently available results as a batch:
    • UInt64Array -- the invocation IDs of completed rows
    • ArrayData -- the corresponding result values
  • When ordered mode is enabled, results are buffered internally and only released in the original submission order

Shutdown

The runtime is stopped when the operator is dropped or during checkpoint finalization. All in-flight invocations are either completed or timed out before shutdown completes.

Integration with Async UDF Operator

The AsyncUdfDylib is wrapped by the async UDF operator in the worker, which:

  • Calls start() during operator initialization with configuration from the pipeline definition
  • Calls send() for each incoming row, assigning monotonically increasing invocation IDs
  • Calls drain_results() on each operator poll cycle to collect and emit completed results
  • Handles checkpoint barriers by draining all in-flight results before acknowledging the checkpoint

Implements

Principle:ArroyoSystems_Arroyo_UDF_Runtime_Execution Environment:ArroyoSystems_Arroyo_Python_UDF_Runtime Heuristic:ArroyoSystems_Arroyo_Async_UDF_Concurrency

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment