Implementation:ArroyoSystems Arroyo Operator Context
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Operators, Dataflow |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Provides the OperatorContext, SourceContext, and supporting types (WatermarkHolder, BatchSender, BatchReceiver, ArrowCollector, SourceCollector) that form the runtime environment available to all Arroyo operators during execution.
Description
This module implements the runtime context and data transport layer for operators:
- WatermarkHolder -- manages the minimum watermark across all input partitions. Tracks both the current watermark (which may be Idle) and the last non-idle watermark time. Updates are computed by folding all partition watermarks.
- BatchSender / BatchReceiver -- bounded-by-rows channel wrapper around an UnboundedSender<QueueItem>. BatchSender tracks queued message count and byte size via AtomicU32/AtomicU64, and uses a Notify to implement backpressure when the queue exceeds its row capacity.
- ArrowCollector -- the output collector for non-source operators. Distributes RecordBatches to downstream operators based on key hashing (for keyed outputs) or broadcasting (for unkeyed). Implements Collector trait with collect and broadcast methods. Handles hash computation, partition assignment via server_for_hash_array, and late data filtering via RateLimiter.
- SourceCollector -- the output collector for source operators. Similar to ArrowCollector but also handles the deserialization pipeline through ArrowDeserializer with format/framing/bad_data configuration and schema resolution.
- OperatorContext -- the central context struct holding task_info, control_tx (for sending ControlResp), table_manager (for state access), watermark, and scheduling information. Provides methods for state access and control plane communication.
- SourceContext -- extends OperatorContext with source-specific functionality including deserialization, output collection, and idle timeout management.
Usage
Every operator receives an OperatorContext (or SourceContext for sources) during initialization. Use it to access state, emit output records, read watermarks, and communicate with the control plane.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-operator/src/context.rs
Signature
pub type QueueItem = ArrowMessage;
pub struct WatermarkHolder {
last_present_watermark: Option<SystemTime>,
cur_watermark: Option<Watermark>,
watermarks: Vec<Option<Watermark>>,
}
impl WatermarkHolder {
pub fn new(watermarks: Vec<Option<Watermark>>) -> Self;
pub fn watermark(&self) -> Option<Watermark>;
pub fn last_present_watermark(&self) -> Option<SystemTime>;
pub fn set(&mut self, idx: usize, watermark: Watermark) -> Option<Option<Watermark>>;
}
pub struct BatchSender { ... }
impl BatchSender {
pub async fn send(&self, item: QueueItem) -> Result<(), SendError<QueueItem>>;
}
pub struct BatchReceiver { ... }
impl BatchReceiver {
pub async fn recv(&mut self) -> Option<QueueItem>;
}
pub struct ArrowCollector { ... }
impl ArrowCollector {
pub async fn collect(&mut self, batch: RecordBatch) -> DataflowResult<()>;
pub async fn broadcast(&mut self, message: ArrowMessage) -> DataflowResult<()>;
}
pub struct OperatorContext { ... }
pub struct SourceContext { ... }
Import
use arroyo_operator::context::{
OperatorContext, SourceContext, ArrowCollector, BatchSender, BatchReceiver,
WatermarkHolder, QueueItem,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batch | RecordBatch | Yes | Arrow record batch to emit downstream |
| watermarks | Vec<Option<Watermark>> | Yes | Initial watermark state per input partition |
| out_schema | ArroyoSchema | Yes | Output schema with key/routing indices for partitioning |
Outputs
| Name | Type | Description |
|---|---|---|
| QueueItem | ArrowMessage | Data or signal message sent to downstream operators via BatchSender |
| ControlResp | ControlResp | Control plane messages (checkpoint events, errors) |
Usage Examples
// Inside an ArrowOperator implementation
async fn process_batch(
&mut self,
batch: RecordBatch,
ctx: &mut OperatorContext,
collector: &mut ArrowCollector,
) -> DataflowResult<()> {
// Process the batch
let result = transform(batch)?;
// Emit the result downstream (automatically partitioned by key)
collector.collect(result).await?;
Ok(())
}
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment