Implementation:ArroyoSystems Arroyo UDF Macro
Summary
This page documents the implementation of the #[udf] Rust proc macro and the @udf Python decorator, which together form the UDF authoring surface for the Arroyo streaming engine. These metaprogramming constructs parse annotated user functions and generate the FFI glue code required for integration with the streaming dataflow.
Code Reference
| Component | File | Lines |
|---|---|---|
| Rust proc macro | crates/arroyo-udf/arroyo-udf-macros/src/lib.rs |
L62-L84 |
| Python decorator | crates/arroyo-udf/arroyo-udf-python/python/arroyo_udf.py |
L4-L6 |
Rust Proc Macro: #[udf]
Signature
#[proc_macro_attribute]
pub fn udf(_attr: TokenStream, item: TokenStream) -> TokenStream {
// Parses the annotated function, generates FFI entry points
// Sync: __run(args: FfiArrays) -> RunResult
// Async: __start/__send/__drain_results/__stop_runtime
}
Behavior
The #[udf] proc macro operates at compile time and performs the following steps:
- Parses the annotated function's signature using the
syncrate to extract parameter names, types, and return type - Detects whether the function is
asyncto determine which FFI entry points to generate - Generates FFI entry points depending on the function variant:
Sync UDF Entry Points
For synchronous UDFs, the macro generates a single entry point:
__run(args: FfiArrays) -> RunResult-- accepts Arrow arrays via the C Data Interface, converts them to native Rust types, invokes the user function per row, and collects results back into an Arrow array
Async UDF Entry Points
For asynchronous UDFs, the macro generates a lifecycle-oriented set of entry points:
__start(ordered: bool, timeout: Duration, allowed_in_flight: u32)-- initializes the Tokio async runtime and configures execution parameters__send(id: u64, data: Vec<ArrayData>)-- submits a single row for asynchronous processing, identified by a unique invocation ID__drain_results() -> Option<(UInt64Array, ArrayData)>-- collects completed results, returning a mapping from invocation IDs to output values__stop_runtime()-- shuts down the async runtime gracefully
I/O
- Input: An annotated Rust function (parsed as a
TokenStream) - Output: Generated FFI entry points (emitted as a
TokenStream) that the streaming engine calls at runtime
Python Decorator: @udf
Signature
def udf(func):
udf_functions.append(func)
return func
Behavior
The Python @udf decorator is intentionally minimal:
- Registers the decorated function by appending it to the global
udf_functionslist - Returns the original function unchanged (no wrapping or modification)
The actual FFI integration for Python UDFs is handled separately by the Python UDF host, which introspects the registered functions' type annotations to determine Arrow type mappings.
I/O
- Input: A decorated Python function with type annotations
- Output: The same function, registered in the
udf_functionslist for discovery by the UDF host
Example Usage
Rust
#[udf]
fn my_udf(x: i64, y: String) -> Option<String> {
Some(format!("{}: {}", y, x))
}
The macro expands this into the original function plus a __run FFI entry point that:
- Receives Arrow arrays for
x(Int64Array) andy(StringArray) - Iterates over each row, calling
my_udfper element - Collects results into a nullable StringArray
- Returns the result via the Arrow C Data Interface
Python
@udf
def my_udf(x: int, y: str) -> str:
return f"{y}: {x}"
The decorator registers my_udf so the Python UDF host can discover it, inspect its type annotations (int -> Int64, str -> Utf8, return str -> Utf8), and generate the appropriate Arrow conversion logic.
Implements
Principle:ArroyoSystems_Arroyo_UDF_Authoring Environment:ArroyoSystems_Arroyo_Python_UDF_Runtime