Implementation:ArroyoSystems Arroyo Async Udf Runtime
| Knowledge Sources | |
|---|---|
| Domains | Streaming, UDF, Async_Processing |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
The arroyo-udf-plugin async UDF runtime provides the execution infrastructure for asynchronous user-defined functions, managing concurrent futures with configurable ordering, timeouts, and backpressure through a dedicated Tokio runtime on a separate thread.
Description
This module implements the runtime side of Arroyo's async UDF system. The core components are:
- FuturesEnum<F> -- An enum wrapping either FuturesOrdered or FuturesUnordered, providing a unified interface for managing async UDF futures with configurable ordering semantics. FuturesOrdered preserves input order (results are yielded in the order they were submitted), while FuturesUnordered yields results as they complete.
- AsyncUdfHandle -- The engine-facing handle containing a Sender<QueueData> for submitting work and a ResultMutex for draining completed results. It can be converted to an FFI-safe pointer via into_ffi for passing across the dylib boundary.
- AsyncUdf<F, FnT> -- The main runtime struct that runs on a dedicated thread with its own single-threaded Tokio runtime. It manages:
- A FuturesEnum of in-flight async UDF invocations
- An mpsc::Receiver for incoming work items
- A shared ResultMutex (Arc<Mutex<(UInt64Builder, Box<dyn ArrayBuilder>)>>) for collecting results as (id, value) pairs
- Backpressure via allowed_in_flight limiting concurrent futures
- send / drain_results / stop_runtime -- FFI-safe functions that interact with the AsyncUdfHandle through raw pointer casts, allowing the UDF dylib to submit work, collect results, and shut down the runtime.
Timeout handling panics on expiration (both ordered and unordered modes) to maintain correctness guarantees.
Usage
Used when a user defines an async UDF (annotated with #[udf(async)] in Rust). The Arroyo engine loads the UDF dylib which creates an AsyncUdf instance on construction, starts it on a background thread, and communicates via the AsyncUdfHandle through the FFI boundary.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-udf/arroyo-udf-plugin/src/async_udf.rs
Signature
pub enum FuturesEnum<F: Future + Send + 'static> {
Ordered(FuturesOrdered<F>),
Unordered(FuturesUnordered<F>),
}
impl<T: Send, F: Future<Output = T> + Send + 'static> FuturesEnum<F> {
pub fn push_back(&mut self, f: F);
pub async fn next(&mut self) -> Option<T>;
pub fn len(&self) -> usize;
pub fn is_empty(&self) -> bool;
pub fn is_ordered(&self) -> bool;
}
pub struct AsyncUdfHandle {
pub tx: Sender<QueueData>,
pub results: ResultMutex,
}
impl AsyncUdfHandle {
pub fn into_ffi(self) -> *mut FfiAsyncUdfHandle;
}
pub type OutputT = (u64, Result<ArrowDatum, Elapsed>);
pub struct AsyncUdf<F, FnT> {
futures: FuturesEnum<F>,
rx: Receiver<QueueData>,
results: ResultMutex,
func: FnT,
timeout: Duration,
allowed_in_flight: usize,
}
impl<F, FnT> AsyncUdf<F, FnT> {
pub fn new(
ordered: bool,
timeout: Duration,
allowed_in_flight: u32,
builder: Box<dyn ArrayBuilder>,
func: FnT,
) -> (Self, AsyncUdfHandle);
pub fn start(self);
}
pub async fn send(handle: SendableFfiAsyncUdfHandle, id: u64, arrays: FfiArrays) -> bool;
pub fn drain_results(handle: SendableFfiAsyncUdfHandle) -> DrainResult;
pub fn stop_runtime(handle: SendableFfiAsyncUdfHandle);
Import
use arroyo_udf_plugin::async_udf::{
AsyncUdf, AsyncUdfHandle, FuturesEnum, send, drain_results, stop_runtime,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| id | u64 | Yes | Unique identifier for the UDF invocation, used to correlate results |
| arrays | FfiArrays | Yes | FFI-safe input argument arrays for the UDF function |
| timeout | Duration | Yes | Maximum time allowed for each async UDF invocation |
Outputs
| Name | Type | Description |
|---|---|---|
| drain_result | DrainResult | Collected (id, value) pairs from completed UDF invocations, returned as FfiArrays |
Usage Examples
use std::time::Duration;
use arrow::array::StringBuilder;
// Create an async UDF runtime
let (udf, handle) = AsyncUdf::new(
true, // ordered
Duration::from_secs(5), // timeout
100, // max in-flight
Box::new(StringBuilder::new()), // result builder
|id, timeout, args| async move {
// async UDF logic here
(id, Ok(ArrowDatum::String(Some("result".to_string()))))
},
);
// Start the runtime on a background thread
udf.start();
// The handle is passed to the UDF dylib via FFI
let ffi_handle = handle.into_ffi();