Implementation:ArroyoSystems Arroyo UDF CRUD
Summary
This page documents the implementation of UDF CRUD (Create, Read, Update, Delete) operations and the operator-level metrics used for indirect UDF observability in the Arroyo streaming engine.
Code Reference
| Component | File | Lines |
|---|---|---|
| UDF list and delete endpoints | crates/arroyo-api/src/udfs.rs |
L128-L174 |
| Operator metrics definitions | crates/arroyo-metrics/src/lib.rs |
L48-L97 |
get_udfs
Signature
pub async fn get_udfs(
State(state): State<AppState>,
bearer_auth: BearerAuth,
) -> Result<Json<GlobalUdfCollection>, ErrorResp>
I/O
- Input:
AppState(application state with database pool) andBearerAuth(authentication token) - Output:
Json<GlobalUdfCollection>-- a JSON response containing a list of all registered UDFs
Behavior
- Authenticate the request using the bearer token
- Query the database for all UDF records associated with the authenticated user's organization
- Map database records to
GlobalUdfresponse objects, including:- UDF public ID
- Function name
- Source definition
- Dylib URL
- Language (Rust or Python)
- Description
- Creation and update timestamps
- Return the collection wrapped in JSON
delete_udf
Signature
pub async fn delete_udf(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path(udf_pub_id): Path<String>,
) -> Result<(), ErrorResp>
I/O
- Input:
AppState,BearerAuth, andudf_pub_id: String(the public identifier of the UDF to delete) - Output:
()on success, or anErrorRespon failure
Behavior
- Authenticate the request using the bearer token
- Look up the UDF record by its public ID within the authenticated user's organization
- Verify that the UDF exists; return 404 if not found
- Check dependencies -- verify that no active pipelines reference this UDF (if dependency checking is implemented)
- Delete the database record
- Return success (empty response with 200 status)
Note: The compiled artifact in object storage is not immediately deleted. Artifact cleanup is handled separately (either by garbage collection or left for manual cleanup).
Operator Metrics
Metric Definitions
The operator metrics are defined as Prometheus counters with operator labels. These provide indirect observability for UDF execution since UDFs run within operators.
// Seven Prometheus counters defined with operator labels:
MESSAGES_RECV // Total messages received by the operator
MESSAGES_SENT // Total messages sent by the operator
BYTES_RECV // Total bytes received by the operator
BYTES_SENT // Total bytes sent by the operator
BATCHES_RECV // Total Arrow batches received by the operator
BATCHES_SENT // Total Arrow batches sent by the operator
DESERIALIZATION_ERRORS // Total deserialization errors encountered
Metric Labels
All seven counters share a common set of operator labels that enable per-operator drill-down:
| Label | Description |
|---|---|
operator_id |
Unique identifier of the operator instance |
operator_name |
Human-readable name of the operator |
subtask_idx |
Index of the parallel subtask within the operator |
UDF Observability via Metrics
Since there are no UDF-specific metrics, UDF performance is inferred from operator metrics:
- Sync UDFs: Executed within DataFusion plan operators. The operator's
MESSAGES_RECV/MESSAGES_SENTandBATCHES_RECV/BATCHES_SENTcounters reflect the throughput of computations that include UDF evaluation. - Async UDFs: Executed within dedicated async UDF operators. The operator's metrics directly reflect the async UDF's processing rate, including any backpressure effects from concurrency limits or timeouts.
A significant difference between MESSAGES_RECV and MESSAGES_SENT on a UDF-containing operator may indicate that the UDF is filtering rows (returning null/empty results) or experiencing errors.
Implements
Principle:ArroyoSystems_Arroyo_UDF_Lifecycle_Management Environment:ArroyoSystems_Arroyo_PostgreSQL_Database