Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Async Udf Runtime

From Leeroopedia


Knowledge Sources
Domains Streaming, UDF, Async_Processing
Last Updated 2026-02-08 08:00 GMT

Overview

The arroyo-udf-plugin async UDF runtime provides the execution infrastructure for asynchronous user-defined functions, managing concurrent futures with configurable ordering, timeouts, and backpressure through a dedicated Tokio runtime on a separate thread.

Description

This module implements the runtime side of Arroyo's async UDF system. The core components are:

  • FuturesEnum<F> -- An enum wrapping either FuturesOrdered or FuturesUnordered, providing a unified interface for managing async UDF futures with configurable ordering semantics. FuturesOrdered preserves input order (results are yielded in the order they were submitted), while FuturesUnordered yields results as they complete.
  • AsyncUdfHandle -- The engine-facing handle containing a Sender<QueueData> for submitting work and a ResultMutex for draining completed results. It can be converted to an FFI-safe pointer via into_ffi for passing across the dylib boundary.
  • AsyncUdf<F, FnT> -- The main runtime struct that runs on a dedicated thread with its own single-threaded Tokio runtime. It manages:
    • A FuturesEnum of in-flight async UDF invocations
    • An mpsc::Receiver for incoming work items
    • A shared ResultMutex (Arc<Mutex<(UInt64Builder, Box<dyn ArrayBuilder>)>>) for collecting results as (id, value) pairs
    • Backpressure via allowed_in_flight limiting concurrent futures
  • send / drain_results / stop_runtime -- FFI-safe functions that interact with the AsyncUdfHandle through raw pointer casts, allowing the UDF dylib to submit work, collect results, and shut down the runtime.

Timeout handling panics on expiration (both ordered and unordered modes) to maintain correctness guarantees.

Usage

Used when a user defines an async UDF (annotated with #[udf(async)] in Rust). The Arroyo engine loads the UDF dylib which creates an AsyncUdf instance on construction, starts it on a background thread, and communicates via the AsyncUdfHandle through the FFI boundary.

Code Reference

Source Location

Signature

pub enum FuturesEnum<F: Future + Send + 'static> {
    Ordered(FuturesOrdered<F>),
    Unordered(FuturesUnordered<F>),
}

impl<T: Send, F: Future<Output = T> + Send + 'static> FuturesEnum<F> {
    pub fn push_back(&mut self, f: F);
    pub async fn next(&mut self) -> Option<T>;
    pub fn len(&self) -> usize;
    pub fn is_empty(&self) -> bool;
    pub fn is_ordered(&self) -> bool;
}

pub struct AsyncUdfHandle {
    pub tx: Sender<QueueData>,
    pub results: ResultMutex,
}

impl AsyncUdfHandle {
    pub fn into_ffi(self) -> *mut FfiAsyncUdfHandle;
}

pub type OutputT = (u64, Result<ArrowDatum, Elapsed>);

pub struct AsyncUdf<F, FnT> {
    futures: FuturesEnum<F>,
    rx: Receiver<QueueData>,
    results: ResultMutex,
    func: FnT,
    timeout: Duration,
    allowed_in_flight: usize,
}

impl<F, FnT> AsyncUdf<F, FnT> {
    pub fn new(
        ordered: bool,
        timeout: Duration,
        allowed_in_flight: u32,
        builder: Box<dyn ArrayBuilder>,
        func: FnT,
    ) -> (Self, AsyncUdfHandle);
    pub fn start(self);
}

pub async fn send(handle: SendableFfiAsyncUdfHandle, id: u64, arrays: FfiArrays) -> bool;
pub fn drain_results(handle: SendableFfiAsyncUdfHandle) -> DrainResult;
pub fn stop_runtime(handle: SendableFfiAsyncUdfHandle);

Import

use arroyo_udf_plugin::async_udf::{
    AsyncUdf, AsyncUdfHandle, FuturesEnum, send, drain_results, stop_runtime,
};

I/O Contract

Inputs

Name Type Required Description
id u64 Yes Unique identifier for the UDF invocation, used to correlate results
arrays FfiArrays Yes FFI-safe input argument arrays for the UDF function
timeout Duration Yes Maximum time allowed for each async UDF invocation

Outputs

Name Type Description
drain_result DrainResult Collected (id, value) pairs from completed UDF invocations, returned as FfiArrays

Usage Examples

use std::time::Duration;
use arrow::array::StringBuilder;

// Create an async UDF runtime
let (udf, handle) = AsyncUdf::new(
    true,                             // ordered
    Duration::from_secs(5),           // timeout
    100,                              // max in-flight
    Box::new(StringBuilder::new()),   // result builder
    |id, timeout, args| async move {
        // async UDF logic here
        (id, Ok(ArrowDatum::String(Some("result".to_string()))))
    },
);

// Start the runtime on a background thread
udf.start();

// The handle is passed to the UDF dylib via FFI
let ffi_handle = handle.into_ffi();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment