Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Operator Trait

From Leeroopedia


Knowledge Sources
Domains Streaming, Operators, Dataflow
Last Updated 2026-02-08 08:00 GMT

Overview

Defines the core operator traits ArrowOperator and SourceOperator, the OperatorNode execution infrastructure, the Registry for operator and UDF lookup, and the complete operator execution loop including checkpoint handling, watermark propagation, and chained operator execution.

Description

This module is the heart of the Arroyo operator framework:

  • OperatorConstructor trait -- factory trait for creating operators from protobuf config and a UDF registry.
  • ConstructedOperator -- enum wrapping either a Source or Operator with name/display methods.
  • ArrowOperator trait -- the primary operator interface with lifecycle methods:
    • on_start -- initialization with OperatorContext.
    • process_batch / process_batch_index -- process incoming data batches.
    • on_close -- cleanup when the operator stops.
    • handle_checkpoint -- participate in checkpoint protocol.
    • handle_commit -- handle two-phase commit.
    • handle_watermark_int -- process watermark advancement.
    • tables -- declare state table requirements.
    • tick_interval / handle_tick -- periodic timer for time-based operations.
  • SourceOperator trait -- the source operator interface with run as the main entry point, producing data into a SourceContext.
  • OperatorNode -- wraps either a SourceNode or a ChainedOperator for execution.
  • ChainedOperator -- a sequence of ArrowOperators executed in a single task, connected via in-memory channels. The chain_processing function orchestrates the full lifecycle.
  • Registry -- central registry mapping operator names to constructors and managing UDF loading (Rust dylibs and Python UDFs). The load_dylib function handles downloading, caching, and dynamically loading shared libraries.
  • Execution loop -- The run_operator / run_source / chain_processing functions implement the complete execution loop including input reading, checkpoint barrier alignment, watermark merging, state management, and graceful shutdown.

Usage

Implement ArrowOperator for new processing operators or SourceOperator for new data sources. Register them in the Registry for the engine to instantiate.

Code Reference

Source Location

Signature

pub trait OperatorConstructor: Send {
    type ConfigT: prost::Message + Default;
    fn with_config(&self, config: Self::ConfigT, registry: Arc<Registry>) -> anyhow::Result<ConstructedOperator>;
}

#[async_trait]
pub trait ArrowOperator: Send + 'static {
    fn name(&self) -> String;
    fn tables(&self) -> HashMap<String, TableConfig> { HashMap::new() }
    fn tick_interval(&self) -> Option<Duration> { None }
    async fn on_start(&mut self, ctx: &mut OperatorContext) {}
    async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut OperatorContext, collector: &mut ArrowCollector) -> DataflowResult<()>;
    async fn process_batch_index(&mut self, index: usize, total_inputs: usize, batch: RecordBatch, ctx: &mut OperatorContext, collector: &mut ArrowCollector) -> DataflowResult<()>;
    async fn handle_checkpoint(&mut self, checkpoint_barrier: CheckpointBarrier, ctx: &mut OperatorContext) -> DataflowResult<()>;
    async fn handle_commit(&mut self, epoch: u32, commit_data: &HashMap<String, HashMap<u32, Vec<u8>>>, ctx: &mut OperatorContext) -> DataflowResult<()>;
    async fn handle_watermark_int(&mut self, watermark: Watermark, ctx: &mut OperatorContext, collector: &mut ArrowCollector) -> DataflowResult<Option<Watermark>>;
    async fn on_close(&mut self, final_message: &Option<SignalMessage>, ctx: &mut OperatorContext, collector: &mut ArrowCollector) -> DataflowResult<Option<SignalMessage>>;
    fn display(&self) -> DisplayableOperator<'_>;
}

#[async_trait]
pub trait SourceOperator: Send + 'static {
    fn name(&self) -> String;
    fn tables(&self) -> HashMap<String, TableConfig> { HashMap::new() }
    async fn on_start(&mut self, ctx: &mut SourceContext) -> DataflowResult<()>;
    async fn run(&mut self, ctx: &mut SourceContext) -> SourceFinishType;
    async fn on_close(&mut self, ctx: &mut SourceContext) -> DataflowResult<()>;
}

pub struct Registry {
    pub constructors: HashMap<String, Box<dyn ErasedConstructor>>,
}

impl Registry {
    pub fn new() -> Self;
    pub fn add<T: OperatorConstructor + 'static>(&mut self, name: &str, constructor: T);
}

Import

use arroyo_operator::operator::{
    ArrowOperator, SourceOperator, OperatorConstructor, ConstructedOperator,
    Registry, OperatorNode, DisplayableOperator,
};

I/O Contract

Inputs

Name Type Required Description
batch RecordBatch Yes Input data batch from upstream operators
checkpoint_barrier CheckpointBarrier Yes (during checkpoint) Checkpoint epoch and metadata
watermark Watermark Yes (during watermark) New watermark value to process
ctx &mut OperatorContext Yes Runtime context with state and control access

Outputs

Name Type Description
DataflowResult<()> Result Success or error from processing
Option<Watermark> Option<Watermark> Forwarded/modified watermark for downstream
Option<SignalMessage> Option<SignalMessage> Final signal to propagate on close

Usage Examples

use arroyo_operator::operator::ArrowOperator;
use arroyo_operator::context::{OperatorContext, ArrowCollector};
use arrow::array::RecordBatch;

struct MyFilter {
    threshold: f64,
}

#[async_trait::async_trait]
impl ArrowOperator for MyFilter {
    fn name(&self) -> String {
        "MyFilter".to_string()
    }

    async fn process_batch(
        &mut self,
        batch: RecordBatch,
        ctx: &mut OperatorContext,
        collector: &mut ArrowCollector,
    ) -> DataflowResult<()> {
        let filtered = apply_filter(&batch, self.threshold)?;
        collector.collect(filtered).await
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment