Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:ArroyoSystems Arroyo Async UDF Concurrency

From Leeroopedia





Knowledge Sources
Domains Optimization, UDF
Last Updated 2026-02-08 08:00 GMT

Overview

Async UDF concurrency heuristic: tune `allowed_in_flight` (max concurrent invocations) and per-UDF timeout to balance throughput against resource usage, with ordered vs unordered output modes.

Description

Arroyo supports asynchronous UDFs that perform I/O operations (HTTP lookups, database queries, etc.) during pipeline execution. The async UDF operator manages concurrent invocations using an `allowed_in_flight` parameter that caps how many UDF calls can run simultaneously. Results can be collected in order (preserving input order at the cost of head-of-line blocking) or unordered (higher throughput, non-deterministic output order). Each invocation has a configurable timeout to prevent hanging operations from blocking the pipeline.

Usage

Apply this heuristic when configuring async UDFs for external service calls. If the external service has high latency, increase `allowed_in_flight` to maintain throughput through concurrency. If the service has rate limits, cap `allowed_in_flight` to prevent overloading it. Choose ordered mode when downstream logic depends on input order; use unordered mode for maximum throughput.

The Insight (Rule of Thumb)

  • Action: Configure `max_concurrency` (maps to `allowed_in_flight`) and `timeout` in the async UDF declaration. Choose between ordered and unordered output.
  • Value: Set `allowed_in_flight` to match the external service's concurrency capacity. Typical: 10-100 for HTTP APIs.
  • Trade-off: Higher concurrency = higher throughput + more memory for in-flight state + more load on external service. Ordered output = deterministic but slower (head-of-line blocking). Unordered = faster but non-deterministic.
  • Timeout: Set per-UDF timeout to prevent indefinite blocking. If a UDF times out, the row may be dropped or error-handled.

Reasoning

The async UDF operator maintains two BTreeMaps: one for inputs sent to the UDF function and one for completed outputs. Each input gets a monotonically increasing ID (`next_id`). The operator tracks watermarks as `(id, Watermark)` pairs to ensure watermarks are not advanced past in-flight UDF calls.

In ordered mode, outputs must be emitted in input order. If invocation #5 completes before #3, #5 is buffered until #3 completes. This can cause head-of-line blocking where one slow invocation stalls all subsequent outputs.

In unordered mode, outputs are emitted as soon as they complete, regardless of input order. This maximizes throughput but means downstream operators see records in a different order than they were received.

The state (inputs, outputs, watermarks) is checkpointed, so in-flight UDF calls are replayed from the last checkpoint on recovery.

Code Evidence

AsyncUdfOperator struct from `async_udf.rs:33-52`:

pub struct AsyncUdfOperator {
    name: String,
    udf: AsyncUdfDylib,
    ordered: bool,
    allowed_in_flight: u32,
    timeout: Duration,
    config: api::AsyncUdfOperator,
    registry: Arc<Registry>,
    input_exprs: Vec<Arc<dyn PhysicalExpr>>,
    final_exprs: Vec<Arc<dyn PhysicalExpr>>,
    next_id: u64,
    inputs: BTreeMap<u64, OwnedRow>,
    outputs: BTreeMap<u64, OwnedRow>,
    watermarks: VecDeque<(u64, Watermark)>,
    // ...
}

Ordering configuration from `async_udf.rs:76-79`:

let ordered = match api::AsyncUdfOrdering::try_from(config.ordering) {
    Err(_) | Ok(api::AsyncUdfOrdering::Ordered) => true,
    Ok(api::AsyncUdfOrdering::Unordered) => false,
};

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment