Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo UDF Macro

From Leeroopedia


Template:Implementation

Summary

This page documents the implementation of the #[udf] Rust proc macro and the @udf Python decorator, which together form the UDF authoring surface for the Arroyo streaming engine. These metaprogramming constructs parse annotated user functions and generate the FFI glue code required for integration with the streaming dataflow.

Code Reference

Component File Lines
Rust proc macro crates/arroyo-udf/arroyo-udf-macros/src/lib.rs L62-L84
Python decorator crates/arroyo-udf/arroyo-udf-python/python/arroyo_udf.py L4-L6

Rust Proc Macro: #[udf]

Signature

#[proc_macro_attribute]
pub fn udf(_attr: TokenStream, item: TokenStream) -> TokenStream {
    // Parses the annotated function, generates FFI entry points
    // Sync: __run(args: FfiArrays) -> RunResult
    // Async: __start/__send/__drain_results/__stop_runtime
}

Behavior

The #[udf] proc macro operates at compile time and performs the following steps:

  • Parses the annotated function's signature using the syn crate to extract parameter names, types, and return type
  • Detects whether the function is async to determine which FFI entry points to generate
  • Generates FFI entry points depending on the function variant:

Sync UDF Entry Points

For synchronous UDFs, the macro generates a single entry point:

  • __run(args: FfiArrays) -> RunResult -- accepts Arrow arrays via the C Data Interface, converts them to native Rust types, invokes the user function per row, and collects results back into an Arrow array

Async UDF Entry Points

For asynchronous UDFs, the macro generates a lifecycle-oriented set of entry points:

  • __start(ordered: bool, timeout: Duration, allowed_in_flight: u32) -- initializes the Tokio async runtime and configures execution parameters
  • __send(id: u64, data: Vec<ArrayData>) -- submits a single row for asynchronous processing, identified by a unique invocation ID
  • __drain_results() -> Option<(UInt64Array, ArrayData)> -- collects completed results, returning a mapping from invocation IDs to output values
  • __stop_runtime() -- shuts down the async runtime gracefully

I/O

  • Input: An annotated Rust function (parsed as a TokenStream)
  • Output: Generated FFI entry points (emitted as a TokenStream) that the streaming engine calls at runtime

Python Decorator: @udf

Signature

def udf(func):
    udf_functions.append(func)
    return func

Behavior

The Python @udf decorator is intentionally minimal:

  • Registers the decorated function by appending it to the global udf_functions list
  • Returns the original function unchanged (no wrapping or modification)

The actual FFI integration for Python UDFs is handled separately by the Python UDF host, which introspects the registered functions' type annotations to determine Arrow type mappings.

I/O

  • Input: A decorated Python function with type annotations
  • Output: The same function, registered in the udf_functions list for discovery by the UDF host

Example Usage

Rust

#[udf]
fn my_udf(x: i64, y: String) -> Option<String> {
    Some(format!("{}: {}", y, x))
}

The macro expands this into the original function plus a __run FFI entry point that:

  1. Receives Arrow arrays for x (Int64Array) and y (StringArray)
  2. Iterates over each row, calling my_udf per element
  3. Collects results into a nullable StringArray
  4. Returns the result via the Arrow C Data Interface

Python

@udf
def my_udf(x: int, y: str) -> str:
    return f"{y}: {x}"

The decorator registers my_udf so the Python UDF host can discover it, inspect its type annotations (int -> Int64, str -> Utf8, return str -> Utf8), and generate the appropriate Arrow conversion logic.

Implements

Principle:ArroyoSystems_Arroyo_UDF_Authoring Environment:ArroyoSystems_Arroyo_Python_UDF_Runtime

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment