Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Operator Context

From Leeroopedia


Knowledge Sources
Domains Streaming, Operators, Dataflow
Last Updated 2026-02-08 08:00 GMT

Overview

Provides the OperatorContext, SourceContext, and supporting types (WatermarkHolder, BatchSender, BatchReceiver, ArrowCollector, SourceCollector) that form the runtime environment available to all Arroyo operators during execution.

Description

This module implements the runtime context and data transport layer for operators:

  • WatermarkHolder -- manages the minimum watermark across all input partitions. Tracks both the current watermark (which may be Idle) and the last non-idle watermark time. Updates are computed by folding all partition watermarks.
  • BatchSender / BatchReceiver -- bounded-by-rows channel wrapper around an UnboundedSender<QueueItem>. BatchSender tracks queued message count and byte size via AtomicU32/AtomicU64, and uses a Notify to implement backpressure when the queue exceeds its row capacity.
  • ArrowCollector -- the output collector for non-source operators. Distributes RecordBatches to downstream operators based on key hashing (for keyed outputs) or broadcasting (for unkeyed). Implements Collector trait with collect and broadcast methods. Handles hash computation, partition assignment via server_for_hash_array, and late data filtering via RateLimiter.
  • SourceCollector -- the output collector for source operators. Similar to ArrowCollector but also handles the deserialization pipeline through ArrowDeserializer with format/framing/bad_data configuration and schema resolution.
  • OperatorContext -- the central context struct holding task_info, control_tx (for sending ControlResp), table_manager (for state access), watermark, and scheduling information. Provides methods for state access and control plane communication.
  • SourceContext -- extends OperatorContext with source-specific functionality including deserialization, output collection, and idle timeout management.

Usage

Every operator receives an OperatorContext (or SourceContext for sources) during initialization. Use it to access state, emit output records, read watermarks, and communicate with the control plane.

Code Reference

Source Location

Signature

pub type QueueItem = ArrowMessage;

pub struct WatermarkHolder {
    last_present_watermark: Option<SystemTime>,
    cur_watermark: Option<Watermark>,
    watermarks: Vec<Option<Watermark>>,
}

impl WatermarkHolder {
    pub fn new(watermarks: Vec<Option<Watermark>>) -> Self;
    pub fn watermark(&self) -> Option<Watermark>;
    pub fn last_present_watermark(&self) -> Option<SystemTime>;
    pub fn set(&mut self, idx: usize, watermark: Watermark) -> Option<Option<Watermark>>;
}

pub struct BatchSender { ... }
impl BatchSender {
    pub async fn send(&self, item: QueueItem) -> Result<(), SendError<QueueItem>>;
}

pub struct BatchReceiver { ... }
impl BatchReceiver {
    pub async fn recv(&mut self) -> Option<QueueItem>;
}

pub struct ArrowCollector { ... }
impl ArrowCollector {
    pub async fn collect(&mut self, batch: RecordBatch) -> DataflowResult<()>;
    pub async fn broadcast(&mut self, message: ArrowMessage) -> DataflowResult<()>;
}

pub struct OperatorContext { ... }
pub struct SourceContext { ... }

Import

use arroyo_operator::context::{
    OperatorContext, SourceContext, ArrowCollector, BatchSender, BatchReceiver,
    WatermarkHolder, QueueItem,
};

I/O Contract

Inputs

Name Type Required Description
batch RecordBatch Yes Arrow record batch to emit downstream
watermarks Vec<Option<Watermark>> Yes Initial watermark state per input partition
out_schema ArroyoSchema Yes Output schema with key/routing indices for partitioning

Outputs

Name Type Description
QueueItem ArrowMessage Data or signal message sent to downstream operators via BatchSender
ControlResp ControlResp Control plane messages (checkpoint events, errors)

Usage Examples

// Inside an ArrowOperator implementation
async fn process_batch(
    &mut self,
    batch: RecordBatch,
    ctx: &mut OperatorContext,
    collector: &mut ArrowCollector,
) -> DataflowResult<()> {
    // Process the batch
    let result = transform(batch)?;

    // Emit the result downstream (automatically partitioned by key)
    collector.collect(result).await?;

    Ok(())
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment