Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Iceberg Metadata

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Extracts Iceberg-compatible file metadata (row counts, column sizes, null counts, min/max bounds, split offsets) from Parquet file metadata and constructs Iceberg DataFile manifest entries.

Description

This module bridges Parquet file statistics into the Iceberg metadata model. The IcebergFileMetadata struct is a serializable (bincode-compatible) representation of per-file statistics including row count, split offsets, and per-column accumulators (ColumnAccum) containing compressed size, value count, null count, and min/max bounds encoded as Iceberg Datum byte arrays.

The IcebergFileMetadata::from_parquet method traverses the Iceberg schema using an IndexByParquetPathName visitor to map Parquet column paths to Iceberg field IDs, then aggregates row-group-level statistics using a MinMaxColAggregator that correctly handles exact min/max bounds for all Iceberg primitive types (boolean, int, long, float, double, string, date, time, timestamp, uuid, fixed, binary).

The build_datafile_from_meta function converts an IcebergFileMetadata into a fully populated Iceberg DataFile entry including partition values computed by applying Iceberg transform functions to the lower bounds of partition fields.

Usage

Used internally during Iceberg sink commits to produce the DataFile entries that are recorded in Iceberg manifest files. Called after each Parquet file is written and closed.

Code Reference

Source Location

Signature

#[derive(Debug, Decode, Encode, Clone, PartialEq, Eq, Default)]
pub struct BoundsEncoded {
    pub min: Option<Vec<u8>>,
    pub max: Option<Vec<u8>>,
}

#[derive(Debug, Decode, Encode, Clone, PartialEq, Eq, Default)]
pub struct ColumnAccum {
    pub compressed_size: u64,
    pub num_values: u64,
    pub null_values: u64,
    pub bounds: BoundsEncoded,
}

#[derive(Debug, Decode, Encode, Clone, PartialEq, Eq)]
pub struct IcebergFileMetadata {
    pub row_count: u64,
    pub split_offsets: Vec<i64>,
    pub columns: HashMap<i32, ColumnAccum>,
}

impl IcebergFileMetadata {
    pub fn from_parquet(meta: FileMetaData, iceberg_schema: &IcebergSchema) -> IcebergFileMetadata;
}

pub fn build_datafile_from_meta(
    ice_schema: &IceSchema,
    meta: &IcebergFileMetadata,
    partitioning: &IcebergPartitioning,
    file_path: String,
    file_size_bytes: u64,
    partition_spec_id: i32,
) -> anyhow::Result<DataFile>;

pub(crate) fn get_parquet_stat_min_as_datum(
    primitive_type: &PrimitiveType,
    stats: &Statistics,
) -> anyhow::Result<Option<Datum>>;

pub fn get_parquet_stat_max_as_datum(
    primitive_type: &PrimitiveType,
    stats: &Statistics,
) -> anyhow::Result<Option<Datum>>;

Import

use arroyo_connectors::filesystem::sink::iceberg::metadata::{
    IcebergFileMetadata, build_datafile_from_meta, ColumnAccum, BoundsEncoded,
};

I/O Contract

Inputs

Name Type Required Description
meta parquet::format::FileMetaData Yes Raw Parquet file metadata from ArrowWriter::close()
iceberg_schema IcebergSchema Yes Iceberg schema with field IDs for column mapping
partitioning IcebergPartitioning Yes Partition spec fields and transform functions
file_path String Yes Full storage path of the written Parquet file
file_size_bytes u64 Yes Total byte size of the file on storage
partition_spec_id i32 Yes ID of the Iceberg partition spec to associate with the data file

Outputs

Name Type Description
IcebergFileMetadata IcebergFileMetadata Serializable per-file statistics (row count, column sizes, bounds)
DataFile iceberg::spec::DataFile Complete Iceberg manifest entry with column stats, partition values, and split offsets

Usage Examples

// After writing a parquet file, extract iceberg metadata
let parquet_metadata = arrow_writer.close().unwrap();
let ice_meta = IcebergFileMetadata::from_parquet(parquet_metadata, &iceberg_schema);

// Build a DataFile entry for the Iceberg manifest
let data_file = build_datafile_from_meta(
    &iceberg_schema,
    &ice_meta,
    &partitioning,
    "s3://bucket/data/file.parquet".to_string(),
    1234,
    0,
)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment