Implementation:ArroyoSystems Arroyo Iceberg Metadata
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Extracts Iceberg-compatible file metadata (row counts, column sizes, null counts, min/max bounds, split offsets) from Parquet file metadata and constructs Iceberg DataFile manifest entries.
Description
This module bridges Parquet file statistics into the Iceberg metadata model. The IcebergFileMetadata struct is a serializable (bincode-compatible) representation of per-file statistics including row count, split offsets, and per-column accumulators (ColumnAccum) containing compressed size, value count, null count, and min/max bounds encoded as Iceberg Datum byte arrays.
The IcebergFileMetadata::from_parquet method traverses the Iceberg schema using an IndexByParquetPathName visitor to map Parquet column paths to Iceberg field IDs, then aggregates row-group-level statistics using a MinMaxColAggregator that correctly handles exact min/max bounds for all Iceberg primitive types (boolean, int, long, float, double, string, date, time, timestamp, uuid, fixed, binary).
The build_datafile_from_meta function converts an IcebergFileMetadata into a fully populated Iceberg DataFile entry including partition values computed by applying Iceberg transform functions to the lower bounds of partition fields.
Usage
Used internally during Iceberg sink commits to produce the DataFile entries that are recorded in Iceberg manifest files. Called after each Parquet file is written and closed.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/sink/iceberg/metadata.rs
- Lines: 1-709
Signature
#[derive(Debug, Decode, Encode, Clone, PartialEq, Eq, Default)]
pub struct BoundsEncoded {
pub min: Option<Vec<u8>>,
pub max: Option<Vec<u8>>,
}
#[derive(Debug, Decode, Encode, Clone, PartialEq, Eq, Default)]
pub struct ColumnAccum {
pub compressed_size: u64,
pub num_values: u64,
pub null_values: u64,
pub bounds: BoundsEncoded,
}
#[derive(Debug, Decode, Encode, Clone, PartialEq, Eq)]
pub struct IcebergFileMetadata {
pub row_count: u64,
pub split_offsets: Vec<i64>,
pub columns: HashMap<i32, ColumnAccum>,
}
impl IcebergFileMetadata {
pub fn from_parquet(meta: FileMetaData, iceberg_schema: &IcebergSchema) -> IcebergFileMetadata;
}
pub fn build_datafile_from_meta(
ice_schema: &IceSchema,
meta: &IcebergFileMetadata,
partitioning: &IcebergPartitioning,
file_path: String,
file_size_bytes: u64,
partition_spec_id: i32,
) -> anyhow::Result<DataFile>;
pub(crate) fn get_parquet_stat_min_as_datum(
primitive_type: &PrimitiveType,
stats: &Statistics,
) -> anyhow::Result<Option<Datum>>;
pub fn get_parquet_stat_max_as_datum(
primitive_type: &PrimitiveType,
stats: &Statistics,
) -> anyhow::Result<Option<Datum>>;
Import
use arroyo_connectors::filesystem::sink::iceberg::metadata::{
IcebergFileMetadata, build_datafile_from_meta, ColumnAccum, BoundsEncoded,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| meta | parquet::format::FileMetaData | Yes | Raw Parquet file metadata from ArrowWriter::close() |
| iceberg_schema | IcebergSchema | Yes | Iceberg schema with field IDs for column mapping |
| partitioning | IcebergPartitioning | Yes | Partition spec fields and transform functions |
| file_path | String | Yes | Full storage path of the written Parquet file |
| file_size_bytes | u64 | Yes | Total byte size of the file on storage |
| partition_spec_id | i32 | Yes | ID of the Iceberg partition spec to associate with the data file |
Outputs
| Name | Type | Description |
|---|---|---|
| IcebergFileMetadata | IcebergFileMetadata | Serializable per-file statistics (row count, column sizes, bounds) |
| DataFile | iceberg::spec::DataFile | Complete Iceberg manifest entry with column stats, partition values, and split offsets |
Usage Examples
// After writing a parquet file, extract iceberg metadata
let parquet_metadata = arrow_writer.close().unwrap();
let ice_meta = IcebergFileMetadata::from_parquet(parquet_metadata, &iceberg_schema);
// Build a DataFile entry for the Iceberg manifest
let data_file = build_datafile_from_meta(
&iceberg_schema,
&ice_meta,
&partitioning,
"s3://bucket/data/file.parquet".to_string(),
1234,
0,
)?;