Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Datafusion Types

From Leeroopedia


Knowledge Sources
Domains Streaming, Schema, Arrow
Last Updated 2026-02-08 08:00 GMT

Overview

Defines ArroyoSchema, the core schema wrapper around Apache Arrow schemas that adds streaming-specific metadata including timestamp index, key indices, and routing key indices for partitioned dataflow execution.

Description

ArroyoSchema wraps an Arrow Schema with additional indices that the streaming engine needs:

  • timestamp_index -- the column index of the event-time timestamp (always a Timestamp(Nanosecond) column named _timestamp).
  • key_indices -- optional column indices used for stateful key-based operations (joins, aggregations).
  • routing_key_indices -- optional column indices used for determining which subtask processes each record (may differ from key_indices).

The module provides:

  • Constructors -- new, new_unkeyed, new_keyed, from_fields, from_schema_unkeyed, from_schema_keys.
  • Schema manipulation -- schema_without_timestamp, schema_without_keys, with_fields, with_additional_fields.
  • Batch operations -- filter_by_time (late-data filtering), sort, partition (group by keys), unkeyed_batch.
  • Converter helpers -- converter, value_converter, sort_columns, sort_fields for row-based encoding.
  • Protobuf conversion -- TryFrom<api::ArroyoSchema> and Into<api::ArroyoSchema> for gRPC transport.

The helper function server_for_hash_array maps a UInt64 hash array to partition indices for data shuffling.

Usage

Use ArroyoSchema whenever you need to operate on Arrow RecordBatches within the streaming engine. It is the canonical representation of a stream's schema throughout the planner, operators, and state management layers.

Code Reference

Source Location

Signature

pub type ArroyoSchemaRef = Arc<ArroyoSchema>;

pub struct ArroyoSchema {
    pub schema: Arc<Schema>,
    pub timestamp_index: usize,
    key_indices: Option<Vec<usize>>,
    routing_key_indices: Option<Vec<usize>>,
}

impl ArroyoSchema {
    pub fn new(schema: Arc<Schema>, timestamp_index: usize,
               key_indices: Option<Vec<usize>>,
               routing_key_indices: Option<Vec<usize>>) -> Self;
    pub fn new_unkeyed(schema: Arc<Schema>, timestamp_index: usize) -> Self;
    pub fn new_keyed(schema: Arc<Schema>, timestamp_index: usize,
                     key_indices: Vec<usize>) -> Self;
    pub fn from_fields(fields: Vec<Field>) -> Self;
    pub fn from_schema_unkeyed(schema: Arc<Schema>) -> DFResult<Self>;
    pub fn from_schema_keys(schema: Arc<Schema>, key_indices: Vec<usize>) -> DFResult<Self>;
    pub fn filter_by_time(&self, batch: RecordBatch,
                          cutoff: Option<SystemTime>) -> Result<RecordBatch, ArrowError>;
    pub fn sort(&self, batch: RecordBatch, with_timestamp: bool) -> Result<RecordBatch, ArrowError>;
    pub fn partition(&self, batch: &RecordBatch,
                     with_timestamp: bool) -> Result<Vec<Range<usize>>, ArrowError>;
    pub fn routing_keys(&self) -> Option<&Vec<usize>>;
    pub fn storage_keys(&self) -> Option<&Vec<usize>>;
}

pub fn server_for_hash_array(hash: &PrimitiveArray<UInt64Type>, n: usize)
    -> Result<PrimitiveArray<UInt64Type>, ArrowError>;

Import

use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef, server_for_hash_array};

I/O Contract

Inputs

Name Type Required Description
schema Arc<Schema> Yes Arrow schema with a _timestamp column
timestamp_index usize Yes Index of the timestamp column
key_indices Option<Vec<usize>> No Column indices for stateful keying
routing_key_indices Option<Vec<usize>> No Column indices for shuffle routing

Outputs

Name Type Description
ArroyoSchema ArroyoSchema Schema wrapper with streaming metadata
RecordBatch RecordBatch Filtered, sorted, or partitioned batch output
Vec<Range<usize>> Vec<Range<usize>> Partition ranges for key-grouped batches

Usage Examples

use arroyo_rpc::df::ArroyoSchema;
use arrow::datatypes::{Schema, Field, DataType, TimeUnit};
use std::sync::Arc;

// Create a keyed schema
let schema = Arc::new(Schema::new(vec![
    Field::new("user_id", DataType::UInt64, false),
    Field::new("value", DataType::Utf8, true),
    Field::new("_timestamp", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
]));

let arroyo_schema = ArroyoSchema::new_keyed(schema, 2, vec![0]);

// Filter late data from a batch
let filtered = arroyo_schema.filter_by_time(batch, Some(watermark))?;

// Sort by keys and timestamp
let sorted = arroyo_schema.sort(batch, true)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment