Implementation:ArroyoSystems Arroyo Iceberg Schema
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Provides utilities for annotating Arrow schemas with Parquet field IDs required by Iceberg, synchronizing field IDs with catalog-assigned schemas, and retagging nested record batch data to match the annotated schema.
Description
The Iceberg specification requires each field in a Parquet file to carry a unique integer field ID in its metadata (under the key PARQUET:field_id). This module provides three key functions:
- add_parquet_field_ids - Recursively traverses an Arrow Schema and assigns sequential field IDs to every field, including nested children within Struct, List, LargeList, FixedSizeList, Map, and Union data types. This is the initial annotation used when creating a new Iceberg table from an Arrow schema.
- update_field_ids_to_iceberg - After a table is created or loaded from a catalog, the catalog may reassign field IDs (as described in apache/iceberg#13164). This function remaps the Arrow schema field IDs to match those in the actual Iceberg schema returned by the catalog, handling list elements that may be renamed to "element" by certain catalogs.
- normalize_batch_to_schema - Before writing a RecordBatch to Parquet, nested arrays (Struct, List) must have their Arrow field metadata updated to include the correct Parquet field IDs. This function recursively rebuilds nested arrays (StructArray, ListArray) with the field metadata from a target schema.
Usage
Called during Iceberg sink initialization (to prepare the writer schema) and before each batch write (to normalize nested metadata). Essential for correct Iceberg table reads by query engines such as Spark and Trino.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/sink/iceberg/schema.rs
- Lines: 1-738
Signature
/// Add PARQUET:field_id metadata to every field in the schema
pub fn add_parquet_field_ids(schema: &Schema) -> Schema;
/// Remap field IDs in an Arrow schema to match the Iceberg catalog schema
pub fn update_field_ids_to_iceberg(
schema: &Schema,
iceberg: &iceberg::spec::Schema,
) -> anyhow::Result<Schema>;
/// Rebuild nested arrays in a RecordBatch to match the target schema metadata
pub fn normalize_batch_to_schema(
batch: &RecordBatch,
target: &Schema,
) -> anyhow::Result<RecordBatch>;
Import
use arroyo_connectors::filesystem::sink::iceberg::schema::{
add_parquet_field_ids,
update_field_ids_to_iceberg,
normalize_batch_to_schema,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| schema | Arrow Schema | Yes | The Arrow schema to annotate with Parquet field IDs |
| iceberg | iceberg::spec::Schema | No | Iceberg schema from catalog for field ID remapping |
| batch | RecordBatch | No | Record batch whose nested fields need metadata retagging |
| target | Arrow Schema | No | Annotated schema to use as the retagging target |
Outputs
| Name | Type | Description |
|---|---|---|
| Schema | Arrow Schema | Schema with PARQUET:field_id metadata on every field (including nested) |
| RecordBatch | RecordBatch | Batch with nested array metadata matching the target schema |
Usage Examples
// Annotate schema with sequential field IDs
let writer_schema = add_parquet_field_ids(&arrow_schema);
// After loading from catalog, remap to catalog-assigned IDs
let catalog_schema = table.metadata().current_schema();
let updated_schema = update_field_ids_to_iceberg(&writer_schema, catalog_schema)?;
// Before writing, normalize nested batch metadata
let normalized = normalize_batch_to_schema(&batch, &updated_schema)?;