Implementation:ArroyoSystems Arroyo Tumbling Window
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Windowing, Aggregation |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
TumblingAggregatingWindowFunc implements tumbling (fixed-size, non-overlapping) window aggregation using a two-phase partial/final aggregation strategy with asynchronous DataFusion execution plans.
Description
The TumblingAggregatingWindowFunc operator implements the ArrowOperator trait to compute aggregations over non-overlapping, fixed-width time windows. Each incoming batch is binned by a binning_function (a DataFusion PhysicalExpr that rounds timestamps down to the window boundary). Binned data is streamed into per-bin BinComputingHolder instances through unbounded channels feeding a partial_aggregation_plan.
When the watermark advances past a bin boundary, the operator:
- Drains any active partial aggregation execution for that bin
- Passes finished partial batches to the finish_execution_plan via a shared RwLock<Vec<RecordBatch>>
- Optionally runs a final_projection plan on the aggregated results
- Emits the final results with an appended timestamp column set to the bin start time
Active partial aggregation futures are tracked via an Arc<Mutex<FuturesUnordered>> and polled through the future_to_poll / handle_future_result mechanism, allowing the operator to make progress on multiple bin computations concurrently.
State is checkpointed to an expiring time-keyed table "t" that stores partial aggregate batches with their bin timestamps.
Usage
Used when a SQL query specifies a TUMBLE window with a single width parameter. Constructed via TumblingAggregateWindowConstructor from an api::TumblingWindowAggregateOperator configuration.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs
Signature
pub struct TumblingAggregatingWindowFunc<K: Copy> {
width: Duration,
binning_function: Arc<dyn PhysicalExpr>,
partial_aggregation_plan: Arc<dyn ExecutionPlan>,
partial_schema: ArroyoSchema,
finish_execution_plan: Arc<dyn ExecutionPlan>,
aggregate_with_timestamp_schema: SchemaRef,
final_projection: Option<Arc<dyn ExecutionPlan>>,
receiver: Arc<RwLock<Option<UnboundedReceiver<RecordBatch>>>>,
final_batches_passer: Arc<RwLock<Vec<RecordBatch>>>,
futures: Arc<Mutex<FuturesUnordered<NextBatchFuture<K>>>>,
execs: BTreeMap<K, BinComputingHolder<K>>,
}
pub struct TumblingAggregateWindowConstructor;
impl OperatorConstructor for TumblingAggregateWindowConstructor {
type ConfigT = api::TumblingWindowAggregateOperator;
fn with_config(
&self,
config: Self::ConfigT,
registry: Arc<Registry>,
) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_worker::arrow::tumbling_aggregating_window::{
TumblingAggregatingWindowFunc, TumblingAggregateWindowConstructor,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batch | RecordBatch | Yes | Input records partitioned by the binning function into fixed-width time bins |
| watermark | Watermark | Yes | Event-time watermark triggering finalization of bins that fall before the watermark |
| checkpoint | CheckpointBarrier | Yes | Triggers draining of active bin executions and flushing partial aggregates to state table "t" |
Outputs
| Name | Type | Description |
|---|---|---|
| window_result | RecordBatch | Final aggregation result for each completed tumbling window with appended timestamp |
| watermark | Watermark | Forwarded watermark after all completed bins have been finalized and emitted |
Usage Examples
// Tumbling window is created from a SQL query like:
// SELECT key, AVG(value) FROM stream
// GROUP BY key, TUMBLE(event_time, INTERVAL '10' SECOND)
let constructor = TumblingAggregateWindowConstructor;
let operator = constructor.with_config(tumbling_config, registry)?;