Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Core Types

From Leeroopedia


Knowledge Sources
Domains Streaming, Types, Arrow
Last Updated 2026-02-08 08:00 GMT

Overview

Defines the foundational types shared across the entire Arroyo engine: task identity, message types, time utilities, CDC types, hash partitioning functions, and Arrow extension type metadata.

Description

This module is the lowest-level type library in Arroyo, depended upon by all other crates:

  • Identity types -- WorkerId(u64), MachineId(Arc<String>), TaskInfo (job_id, node_id, operator_name/id, task_index, parallelism, key_range), ChainInfo (for chained operators).
  • Time utilities -- to_millis, to_micros, to_nanos, from_millis, from_micros, from_nanos, print_time, days_since_epoch for SystemTime conversions.
  • Trait bounds -- Key (Debug + Clone + Encode + Decode + Hash + Eq + Send) and Data (Debug + Clone + Encode + Decode + Send) blanket trait implementations.
  • Message types -- Watermark (EventTime or Idle), ArrowMessage (Data or Signal), SignalMessage (Barrier, Watermark, Stop, EndOfData), CheckpointBarrier (epoch, min_epoch, timestamp, then_stop).
  • CDC types -- Debezium<T> with before/after/op fields and validation, DebeziumOp (Create/Update/Delete with single-char serde), UpdatingData<T> (Retract/Update/Append).
  • Join support -- JoinType enum (Inner, Left, Right, Full).
  • Hash partitioning -- server_for_hash(x, n) and range_for_server(i, n) for consistent hash-based key range assignment.
  • Arrow extensions -- ArroyoExtensionType::JSON for embedding JSON type metadata in Arrow schemas, RecordBatchBuilder trait, DisplayAsSql for SQL type rendering.
  • Date/time enums -- DatePart and DateTruncPrecision with TryFrom<&str> conversions.
  • Constants -- JOB_ID_ENV, RUN_ID_ENV, TELEMETRY_KEY, HASH_SEEDS, LOOKUP_KEY_INDEX_FIELD, and Prometheus metric name constants (MESSAGES_RECV, BYTES_SENT, etc.).

Usage

Import from arroyo_types for any foundational type used across the Arroyo codebase. TaskInfo and CheckpointBarrier are particularly pervasive.

Code Reference

Source Location

Signature

pub struct WorkerId(pub u64);
pub struct MachineId(pub Arc<String>);

pub struct TaskInfo {
    pub job_id: String,
    pub node_id: u32,
    pub operator_name: String,
    pub operator_id: String,
    pub task_index: u32,
    pub parallelism: u32,
    pub key_range: RangeInclusive<u64>,
}

pub enum Watermark { EventTime(SystemTime), Idle }
pub enum ArrowMessage { Data(RecordBatch), Signal(SignalMessage) }
pub enum SignalMessage { Barrier(CheckpointBarrier), Watermark(Watermark), Stop, EndOfData }

pub struct CheckpointBarrier {
    pub epoch: u32,
    pub min_epoch: u32,
    pub timestamp: SystemTime,
    pub then_stop: bool,
}

pub trait Key: Debug + Clone + Encode + Decode<()> + Hash + PartialEq + Eq + Send + 'static {}
pub trait Data: Debug + Clone + Encode + Decode<()> + Send + 'static {}

pub fn to_millis(time: SystemTime) -> u64;
pub fn from_millis(ts: u64) -> SystemTime;
pub fn server_for_hash(x: u64, n: usize) -> usize;
pub fn range_for_server(i: usize, n: usize) -> RangeInclusive<u64>;

Import

use arroyo_types::{
    TaskInfo, WorkerId, MachineId, Watermark, ArrowMessage, SignalMessage,
    CheckpointBarrier, Key, Data, to_millis, from_millis,
    server_for_hash, range_for_server,
};

I/O Contract

Inputs

Name Type Required Description
x u64 Yes Hash value for partition assignment
n usize Yes Number of partitions/servers
time SystemTime Yes Timestamp to convert

Outputs

Name Type Description
usize usize Partition index for a hash value
RangeInclusive<u64> RangeInclusive<u64> Key range owned by a partition
u64 u64 Millisecond/microsecond timestamp

Usage Examples

use arroyo_types::{TaskInfo, server_for_hash, range_for_server, to_millis};
use std::time::SystemTime;

// Determine which partition owns a key hash
let partition = server_for_hash(my_key_hash, 8);

// Get the key range for partition 3 of 8
let range = range_for_server(3, 8);

// Convert a timestamp
let millis = to_millis(SystemTime::now());

// Create test task info
let task_info = TaskInfo::for_test("job-1", "operator-1");

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment