Implementation:ArroyoSystems Arroyo Core Types
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Types, Arrow |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Defines the foundational types shared across the entire Arroyo engine: task identity, message types, time utilities, CDC types, hash partitioning functions, and Arrow extension type metadata.
Description
This module is the lowest-level type library in Arroyo, depended upon by all other crates:
- Identity types -- WorkerId(u64), MachineId(Arc<String>), TaskInfo (job_id, node_id, operator_name/id, task_index, parallelism, key_range), ChainInfo (for chained operators).
- Time utilities -- to_millis, to_micros, to_nanos, from_millis, from_micros, from_nanos, print_time, days_since_epoch for SystemTime conversions.
- Trait bounds -- Key (Debug + Clone + Encode + Decode + Hash + Eq + Send) and Data (Debug + Clone + Encode + Decode + Send) blanket trait implementations.
- Message types -- Watermark (EventTime or Idle), ArrowMessage (Data or Signal), SignalMessage (Barrier, Watermark, Stop, EndOfData), CheckpointBarrier (epoch, min_epoch, timestamp, then_stop).
- CDC types -- Debezium<T> with before/after/op fields and validation, DebeziumOp (Create/Update/Delete with single-char serde), UpdatingData<T> (Retract/Update/Append).
- Join support -- JoinType enum (Inner, Left, Right, Full).
- Hash partitioning -- server_for_hash(x, n) and range_for_server(i, n) for consistent hash-based key range assignment.
- Arrow extensions -- ArroyoExtensionType::JSON for embedding JSON type metadata in Arrow schemas, RecordBatchBuilder trait, DisplayAsSql for SQL type rendering.
- Date/time enums -- DatePart and DateTruncPrecision with TryFrom<&str> conversions.
- Constants -- JOB_ID_ENV, RUN_ID_ENV, TELEMETRY_KEY, HASH_SEEDS, LOOKUP_KEY_INDEX_FIELD, and Prometheus metric name constants (MESSAGES_RECV, BYTES_SENT, etc.).
Usage
Import from arroyo_types for any foundational type used across the Arroyo codebase. TaskInfo and CheckpointBarrier are particularly pervasive.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-types/src/lib.rs
Signature
pub struct WorkerId(pub u64);
pub struct MachineId(pub Arc<String>);
pub struct TaskInfo {
pub job_id: String,
pub node_id: u32,
pub operator_name: String,
pub operator_id: String,
pub task_index: u32,
pub parallelism: u32,
pub key_range: RangeInclusive<u64>,
}
pub enum Watermark { EventTime(SystemTime), Idle }
pub enum ArrowMessage { Data(RecordBatch), Signal(SignalMessage) }
pub enum SignalMessage { Barrier(CheckpointBarrier), Watermark(Watermark), Stop, EndOfData }
pub struct CheckpointBarrier {
pub epoch: u32,
pub min_epoch: u32,
pub timestamp: SystemTime,
pub then_stop: bool,
}
pub trait Key: Debug + Clone + Encode + Decode<()> + Hash + PartialEq + Eq + Send + 'static {}
pub trait Data: Debug + Clone + Encode + Decode<()> + Send + 'static {}
pub fn to_millis(time: SystemTime) -> u64;
pub fn from_millis(ts: u64) -> SystemTime;
pub fn server_for_hash(x: u64, n: usize) -> usize;
pub fn range_for_server(i: usize, n: usize) -> RangeInclusive<u64>;
Import
use arroyo_types::{
TaskInfo, WorkerId, MachineId, Watermark, ArrowMessage, SignalMessage,
CheckpointBarrier, Key, Data, to_millis, from_millis,
server_for_hash, range_for_server,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| x | u64 | Yes | Hash value for partition assignment |
| n | usize | Yes | Number of partitions/servers |
| time | SystemTime | Yes | Timestamp to convert |
Outputs
| Name | Type | Description |
|---|---|---|
| usize | usize | Partition index for a hash value |
| RangeInclusive<u64> | RangeInclusive<u64> | Key range owned by a partition |
| u64 | u64 | Millisecond/microsecond timestamp |
Usage Examples
use arroyo_types::{TaskInfo, server_for_hash, range_for_server, to_millis};
use std::time::SystemTime;
// Determine which partition owns a key hash
let partition = server_for_hash(my_key_hash, 8);
// Get the key range for partition 3 of 8
let range = range_for_server(3, 8);
// Convert a timestamp
let millis = to_millis(SystemTime::now());
// Create test task info
let task_info = TaskInfo::for_test("job-1", "operator-1");
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment