Implementation:ArroyoSystems Arroyo Iceberg Transforms
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements Iceberg partition transform functions (identity, day, month, year, hour, truncate, bucket) as Arrow compute kernels and registers them as DataFusion user-defined functions for use in SQL partition expressions.
Description
This module provides the transform_arrow function that applies an Iceberg partition transform to an Arrow array. The supported transforms are:
- Identity - Returns the array unchanged.
- Day - Extracts the day number from Date32 or Timestamp(Microsecond) columns as an Int32 (days since Unix epoch).
- Month - Computes months since Unix epoch from Date32 or Timestamp(Microsecond) columns using the formula
12 * (year - 1970) + month. - Year - Computes years since Unix epoch (
year - 1970) from Date32 or Timestamp(Microsecond) columns. - Hour - Computes hours since Unix epoch from Timestamp(Microsecond) columns.
- Truncate - Truncates integer values (Int16, Int32, Int64) to the nearest lower multiple of a given width using
rem_euclid. - Bucket - Applies Murmur3 32-bit hashing followed by modulo to assign values into N buckets. Supports Int32, Int64, Date32, Time32(Millisecond), and Utf8 data types.
Each transform is also wrapped as a DataFusion ScalarUDF via the make_transform_udf macro, producing functions named ice_identity, ice_day, ice_month, ice_year, ice_hour, ice_truncate, and ice_bucket. The register_all function registers all UDFs into a FunctionRegistry.
The fns submodule re-exports the UDF constructors using DataFusion's export_functions macro for convenient programmatic use.
Usage
Used during Iceberg sink configuration to evaluate partition expressions in SQL, and internally during metadata construction to compute partition values for data files. The UDFs are registered by IcebergConnector::register_udfs.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/sink/iceberg/transforms.rs
- Lines: 1-726
Signature
/// Apply an Iceberg partition transform to an Arrow array
pub fn transform_arrow(array: ArrayRef, transform: Transform) -> Result<ArrayRef, ArrowError>;
/// Register all Iceberg transform UDFs into a DataFusion FunctionRegistry
pub fn register_all(registry: &mut dyn FunctionRegistry) -> DFResult<()>;
// DataFusion UDF constructors
pub mod fns {
pub fn ice_identity(value) -> ...;
pub fn ice_hour(timestamp) -> ...;
pub fn ice_day(timestamp) -> ...;
pub fn ice_month(timestamp) -> ...;
pub fn ice_year(timestamp) -> ...;
pub fn ice_truncate(num, len) -> ...;
pub fn ice_bucket(value, count) -> ...;
}
Import
use arroyo_connectors::filesystem::sink::iceberg::transforms::{
transform_arrow, register_all,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| array | ArrayRef | Yes | Arrow array to transform (Date32, Timestamp, Int16/32/64, Utf8) |
| transform | Transform | Yes | Iceberg transform variant (Identity, Day, Month, Year, Hour, Truncate{width}, Bucket{n}) |
Outputs
| Name | Type | Description |
|---|---|---|
| ArrayRef | ArrayRef | Transformed Arrow array (typically Int32 for temporal/bucket transforms, same type for Identity/Truncate) |
Usage Examples
use arroyo_connectors::filesystem::sink::iceberg::transforms::transform_arrow;
use crate::filesystem::config::Transform;
// Apply a bucket transform to an Int64 column
let result = transform_arrow(int64_array, Transform::Bucket { arg0: 16 })?;
// Apply a day transform to a timestamp column
let days = transform_arrow(timestamp_array, Transform::Day)?;
// Apply truncate to an Int32 column with width 100
let truncated = transform_arrow(int32_array, Transform::Truncate { arg0: 100 })?;