Implementation:ArroyoSystems Arroyo UDF Invocation
Summary
This page documents the runtime UDF invocation implementation in the Arroyo streaming engine, covering SyncUdfDylib::invoke_with_args for synchronous columnar execution and AsyncUdfDylib for asynchronous row-level execution.
Code Reference
| Component | File | Lines |
|---|---|---|
| Sync and async UDF host | crates/arroyo-udf/arroyo-udf-host/src/lib.rs |
L176-L427 |
| Async UDF operator | crates/arroyo-worker/src/arrow/async_udf.rs |
L63-L110 |
SyncUdfDylib::invoke_with_args
Signature
impl SyncUdfDylib {
pub fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult<ColumnarValue>
}
I/O
- Input:
ScalarFunctionArgs-- DataFusion's columnar function arguments, containing one or more Arrow arrays corresponding to the UDF's parameters - Output:
DFResult<ColumnarValue>-- a DataFusion result wrapping either:ColumnarValue::Array-- an Arrow array with one result per input rowColumnarValue::Scalar-- a single scalar value (when all inputs are scalar)
Behavior
The synchronous invocation path integrates with DataFusion's ScalarUDF interface:
- Extract arrays: Unwraps the
ScalarFunctionArgsto obtain the underlying Arrow arrays for each parameter. - Convert to FFI: Transforms each Arrow array into the
FfiArraysrepresentation using the Arrow C Data Interface. This is a zero-copy operation that passes pointers across the dynamic library boundary. - Call
__run: Invokes the generated FFI entry point in the loaded dynamic library, passing theFfiArrays. - Process result: The
__runfunction within the dylib:- Converts FFI arrays back to typed Rust arrays
- Iterates over each row, calling the user function
- Collects results into a new Arrow array
- Returns the result via
RunResult
- Return to DataFusion: Wraps the returned Arrow array in a
ColumnarValueand returns it to the DataFusion execution engine.
AsyncUdfDylib
Signatures
impl AsyncUdfDylib {
pub fn start(&self, ordered: bool, timeout: Duration, allowed_in_flight: u32)
pub fn send(&self, id: u64, data: Vec<ArrayData>) -> anyhow::Result<()>
pub fn drain_results(&self) -> Option<(UInt64Array, ArrayData)>
}
I/O
start:- Input:
ordered: bool(whether to preserve input order),timeout: Duration(per-invocation timeout),allowed_in_flight: u32(max concurrent invocations) - Output: None (initializes internal async runtime state)
- Input:
send:- Input:
id: u64(unique invocation identifier),data: Vec<ArrayData>(row data as Arrow arrays, one per parameter) - Output:
anyhow::Result<()>(success or error if the runtime is not accepting new work)
- Input:
drain_results:- Input: None
- Output:
Option<(UInt64Array, ArrayData)>-- if results are available, returns a pair mapping invocation IDs to their corresponding result values;Noneif no results are ready
Behavior
The async UDF execution follows a lifecycle-oriented pattern:
Initialization (start)
- Creates a Tokio async runtime inside the dynamic library
- Configures the runtime with the specified concurrency limit, ordering mode, and timeout
- Sets up internal channels for submitting work and collecting results
Row Submission (send)
- Converts the row's
ArrayDatavalues to native types - Submits the async invocation to the runtime's task pool, tagged with the invocation
id - If the concurrency limit is reached, the call may block until a slot becomes available (backpressure)
Result Collection (drain_results)
- Polls the internal results channel for completed invocations
- Returns all currently available results as a batch:
UInt64Array-- the invocation IDs of completed rowsArrayData-- the corresponding result values
- When ordered mode is enabled, results are buffered internally and only released in the original submission order
Shutdown
The runtime is stopped when the operator is dropped or during checkpoint finalization. All in-flight invocations are either completed or timed out before shutdown completes.
Integration with Async UDF Operator
The AsyncUdfDylib is wrapped by the async UDF operator in the worker, which:
- Calls
start()during operator initialization with configuration from the pipeline definition - Calls
send()for each incoming row, assigning monotonically increasing invocation IDs - Calls
drain_results()on each operator poll cycle to collect and emit completed results - Handles checkpoint barriers by draining all in-flight results before acknowledging the checkpoint
Implements
Principle:ArroyoSystems_Arroyo_UDF_Runtime_Execution Environment:ArroyoSystems_Arroyo_Python_UDF_Runtime Heuristic:ArroyoSystems_Arroyo_Async_UDF_Concurrency