Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Tumbling Window

From Leeroopedia


Knowledge Sources
Domains Streaming, Windowing, Aggregation
Last Updated 2026-02-08 08:00 GMT

Overview

TumblingAggregatingWindowFunc implements tumbling (fixed-size, non-overlapping) window aggregation using a two-phase partial/final aggregation strategy with asynchronous DataFusion execution plans.

Description

The TumblingAggregatingWindowFunc operator implements the ArrowOperator trait to compute aggregations over non-overlapping, fixed-width time windows. Each incoming batch is binned by a binning_function (a DataFusion PhysicalExpr that rounds timestamps down to the window boundary). Binned data is streamed into per-bin BinComputingHolder instances through unbounded channels feeding a partial_aggregation_plan.

When the watermark advances past a bin boundary, the operator:

  1. Drains any active partial aggregation execution for that bin
  2. Passes finished partial batches to the finish_execution_plan via a shared RwLock<Vec<RecordBatch>>
  3. Optionally runs a final_projection plan on the aggregated results
  4. Emits the final results with an appended timestamp column set to the bin start time

Active partial aggregation futures are tracked via an Arc<Mutex<FuturesUnordered>> and polled through the future_to_poll / handle_future_result mechanism, allowing the operator to make progress on multiple bin computations concurrently.

State is checkpointed to an expiring time-keyed table "t" that stores partial aggregate batches with their bin timestamps.

Usage

Used when a SQL query specifies a TUMBLE window with a single width parameter. Constructed via TumblingAggregateWindowConstructor from an api::TumblingWindowAggregateOperator configuration.

Code Reference

Source Location

Signature

pub struct TumblingAggregatingWindowFunc<K: Copy> {
    width: Duration,
    binning_function: Arc<dyn PhysicalExpr>,
    partial_aggregation_plan: Arc<dyn ExecutionPlan>,
    partial_schema: ArroyoSchema,
    finish_execution_plan: Arc<dyn ExecutionPlan>,
    aggregate_with_timestamp_schema: SchemaRef,
    final_projection: Option<Arc<dyn ExecutionPlan>>,
    receiver: Arc<RwLock<Option<UnboundedReceiver<RecordBatch>>>>,
    final_batches_passer: Arc<RwLock<Vec<RecordBatch>>>,
    futures: Arc<Mutex<FuturesUnordered<NextBatchFuture<K>>>>,
    execs: BTreeMap<K, BinComputingHolder<K>>,
}

pub struct TumblingAggregateWindowConstructor;
impl OperatorConstructor for TumblingAggregateWindowConstructor {
    type ConfigT = api::TumblingWindowAggregateOperator;
    fn with_config(
        &self,
        config: Self::ConfigT,
        registry: Arc<Registry>,
    ) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_worker::arrow::tumbling_aggregating_window::{
    TumblingAggregatingWindowFunc, TumblingAggregateWindowConstructor,
};

I/O Contract

Inputs

Name Type Required Description
batch RecordBatch Yes Input records partitioned by the binning function into fixed-width time bins
watermark Watermark Yes Event-time watermark triggering finalization of bins that fall before the watermark
checkpoint CheckpointBarrier Yes Triggers draining of active bin executions and flushing partial aggregates to state table "t"

Outputs

Name Type Description
window_result RecordBatch Final aggregation result for each completed tumbling window with appended timestamp
watermark Watermark Forwarded watermark after all completed bins have been finalized and emitted

Usage Examples

// Tumbling window is created from a SQL query like:
// SELECT key, AVG(value) FROM stream
// GROUP BY key, TUMBLE(event_time, INTERVAL '10' SECOND)

let constructor = TumblingAggregateWindowConstructor;
let operator = constructor.with_config(tumbling_config, registry)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment