Implementation:ArroyoSystems Arroyo Datafusion Types
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Schema, Arrow |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Defines ArroyoSchema, the core schema wrapper around Apache Arrow schemas that adds streaming-specific metadata including timestamp index, key indices, and routing key indices for partitioned dataflow execution.
Description
ArroyoSchema wraps an Arrow Schema with additional indices that the streaming engine needs:
- timestamp_index -- the column index of the event-time timestamp (always a Timestamp(Nanosecond) column named _timestamp).
- key_indices -- optional column indices used for stateful key-based operations (joins, aggregations).
- routing_key_indices -- optional column indices used for determining which subtask processes each record (may differ from key_indices).
The module provides:
- Constructors -- new, new_unkeyed, new_keyed, from_fields, from_schema_unkeyed, from_schema_keys.
- Schema manipulation -- schema_without_timestamp, schema_without_keys, with_fields, with_additional_fields.
- Batch operations -- filter_by_time (late-data filtering), sort, partition (group by keys), unkeyed_batch.
- Converter helpers -- converter, value_converter, sort_columns, sort_fields for row-based encoding.
- Protobuf conversion -- TryFrom<api::ArroyoSchema> and Into<api::ArroyoSchema> for gRPC transport.
The helper function server_for_hash_array maps a UInt64 hash array to partition indices for data shuffling.
Usage
Use ArroyoSchema whenever you need to operate on Arrow RecordBatches within the streaming engine. It is the canonical representation of a stream's schema throughout the planner, operators, and state management layers.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-rpc/src/df.rs
Signature
pub type ArroyoSchemaRef = Arc<ArroyoSchema>;
pub struct ArroyoSchema {
pub schema: Arc<Schema>,
pub timestamp_index: usize,
key_indices: Option<Vec<usize>>,
routing_key_indices: Option<Vec<usize>>,
}
impl ArroyoSchema {
pub fn new(schema: Arc<Schema>, timestamp_index: usize,
key_indices: Option<Vec<usize>>,
routing_key_indices: Option<Vec<usize>>) -> Self;
pub fn new_unkeyed(schema: Arc<Schema>, timestamp_index: usize) -> Self;
pub fn new_keyed(schema: Arc<Schema>, timestamp_index: usize,
key_indices: Vec<usize>) -> Self;
pub fn from_fields(fields: Vec<Field>) -> Self;
pub fn from_schema_unkeyed(schema: Arc<Schema>) -> DFResult<Self>;
pub fn from_schema_keys(schema: Arc<Schema>, key_indices: Vec<usize>) -> DFResult<Self>;
pub fn filter_by_time(&self, batch: RecordBatch,
cutoff: Option<SystemTime>) -> Result<RecordBatch, ArrowError>;
pub fn sort(&self, batch: RecordBatch, with_timestamp: bool) -> Result<RecordBatch, ArrowError>;
pub fn partition(&self, batch: &RecordBatch,
with_timestamp: bool) -> Result<Vec<Range<usize>>, ArrowError>;
pub fn routing_keys(&self) -> Option<&Vec<usize>>;
pub fn storage_keys(&self) -> Option<&Vec<usize>>;
}
pub fn server_for_hash_array(hash: &PrimitiveArray<UInt64Type>, n: usize)
-> Result<PrimitiveArray<UInt64Type>, ArrowError>;
Import
use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef, server_for_hash_array};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| schema | Arc<Schema> | Yes | Arrow schema with a _timestamp column |
| timestamp_index | usize | Yes | Index of the timestamp column |
| key_indices | Option<Vec<usize>> | No | Column indices for stateful keying |
| routing_key_indices | Option<Vec<usize>> | No | Column indices for shuffle routing |
Outputs
| Name | Type | Description |
|---|---|---|
| ArroyoSchema | ArroyoSchema | Schema wrapper with streaming metadata |
| RecordBatch | RecordBatch | Filtered, sorted, or partitioned batch output |
| Vec<Range<usize>> | Vec<Range<usize>> | Partition ranges for key-grouped batches |
Usage Examples
use arroyo_rpc::df::ArroyoSchema;
use arrow::datatypes::{Schema, Field, DataType, TimeUnit};
use std::sync::Arc;
// Create a keyed schema
let schema = Arc::new(Schema::new(vec![
Field::new("user_id", DataType::UInt64, false),
Field::new("value", DataType::Utf8, true),
Field::new("_timestamp", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
]));
let arroyo_schema = ArroyoSchema::new_keyed(schema, 2, vec![0]);
// Filter late data from a batch
let filtered = arroyo_schema.filter_by_time(batch, Some(watermark))?;
// Sort by keys and timestamp
let sorted = arroyo_schema.sort(batch, true)?;