Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:ArroyoSystems Arroyo Batch Size And Backpressure

From Leeroopedia




Knowledge Sources
Domains Optimization, Stream_Processing
Last Updated 2026-02-08 08:00 GMT

Overview

Batch sizing and backpressure heuristic: use 512 records with 100ms linger time for sources, 8192-element bounded queues for inter-operator communication, and 50ms flush intervals for Kafka sources.

Description

Arroyo processes data in Arrow RecordBatch format. Sources batch incoming records before emitting them downstream. The batch size (default 512) and linger time (default 100ms) control the trade-off between latency and throughput. Inter-operator communication uses bounded channels (default 8192 elements) that provide natural backpressure: when a downstream operator is slow, the upstream blocks on the full queue. Network communication between workers flushes every 100ms. Kafka sources use a more aggressive 50ms flush interval.

Usage

Apply this heuristic when tuning pipeline latency or throughput. If end-to-end latency is too high, reduce linger time and batch size. If throughput is too low, increase batch size and queue depth. For Kafka-specific tuning, the 50ms flush interval is hardcoded and more aggressive than the generic 100ms.

The Insight (Rule of Thumb)

  • Action: Configure `pipeline.source-batch-size`, `pipeline.source-batch-linger`, and `worker.queue-size`.
  • Value: Defaults: batch-size = 512, linger = 100ms, queue-size = 8192.
  • Trade-off: Larger batch + longer linger = better throughput + higher latency. Smaller batch + shorter linger = lower latency + more overhead. Larger queue = more buffering + higher memory.
  • Aggregate flush: `pipeline.update-aggregate-flush-interval = 1s` controls how often incremental aggregates emit results.

Reasoning

Arrow RecordBatches are most efficient with hundreds to thousands of rows. Processing one record at a time would negate the benefits of vectorized execution. The 512-record default and 100ms linger provide a good trade-off:

  • At 10,000 events/second, a batch fills in ~50ms (before linger)
  • At 100 events/second, the linger timer fires at 100ms with ~10 records
  • The batch is emitted when either threshold is reached (whichever comes first)

The 8192-element queue between operators can buffer approximately 8192 * 512 = ~4M records worth of batches before backpressure kicks in. This absorbs short bursts without blocking sources.

For incremental (updating) aggregates, the 1-second flush interval means results are emitted at most once per second, even if input data arrives faster. This prevents downstream operators from being overwhelmed with rapid update+retract cycles.

Code Evidence

Default batch configuration from `default.toml:4-7`:

[pipeline]
source-batch-size = 512
source-batch-linger = "100ms"
update-aggregate-flush-interval = "1s"

Queue size from `default.toml:45`:

queue-size = 8192

Kafka-specific flush interval from `kafka/source/mod.rs:193`:

let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));

Network flush interval from `network_manager.rs:379`:

let mut flush_interval: Interval = interval(Duration::from_millis(100));

Bounded channel creation from `engine.rs:319,337`:

batch_bounded(queue_size)  // creates bounded channel for inter-operator communication

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment