Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Iceberg Schema

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Provides utilities for annotating Arrow schemas with Parquet field IDs required by Iceberg, synchronizing field IDs with catalog-assigned schemas, and retagging nested record batch data to match the annotated schema.

Description

The Iceberg specification requires each field in a Parquet file to carry a unique integer field ID in its metadata (under the key PARQUET:field_id). This module provides three key functions:

  • add_parquet_field_ids - Recursively traverses an Arrow Schema and assigns sequential field IDs to every field, including nested children within Struct, List, LargeList, FixedSizeList, Map, and Union data types. This is the initial annotation used when creating a new Iceberg table from an Arrow schema.
  • update_field_ids_to_iceberg - After a table is created or loaded from a catalog, the catalog may reassign field IDs (as described in apache/iceberg#13164). This function remaps the Arrow schema field IDs to match those in the actual Iceberg schema returned by the catalog, handling list elements that may be renamed to "element" by certain catalogs.
  • normalize_batch_to_schema - Before writing a RecordBatch to Parquet, nested arrays (Struct, List) must have their Arrow field metadata updated to include the correct Parquet field IDs. This function recursively rebuilds nested arrays (StructArray, ListArray) with the field metadata from a target schema.

Usage

Called during Iceberg sink initialization (to prepare the writer schema) and before each batch write (to normalize nested metadata). Essential for correct Iceberg table reads by query engines such as Spark and Trino.

Code Reference

Source Location

Signature

/// Add PARQUET:field_id metadata to every field in the schema
pub fn add_parquet_field_ids(schema: &Schema) -> Schema;

/// Remap field IDs in an Arrow schema to match the Iceberg catalog schema
pub fn update_field_ids_to_iceberg(
    schema: &Schema,
    iceberg: &iceberg::spec::Schema,
) -> anyhow::Result<Schema>;

/// Rebuild nested arrays in a RecordBatch to match the target schema metadata
pub fn normalize_batch_to_schema(
    batch: &RecordBatch,
    target: &Schema,
) -> anyhow::Result<RecordBatch>;

Import

use arroyo_connectors::filesystem::sink::iceberg::schema::{
    add_parquet_field_ids,
    update_field_ids_to_iceberg,
    normalize_batch_to_schema,
};

I/O Contract

Inputs

Name Type Required Description
schema Arrow Schema Yes The Arrow schema to annotate with Parquet field IDs
iceberg iceberg::spec::Schema No Iceberg schema from catalog for field ID remapping
batch RecordBatch No Record batch whose nested fields need metadata retagging
target Arrow Schema No Annotated schema to use as the retagging target

Outputs

Name Type Description
Schema Arrow Schema Schema with PARQUET:field_id metadata on every field (including nested)
RecordBatch RecordBatch Batch with nested array metadata matching the target schema

Usage Examples

// Annotate schema with sequential field IDs
let writer_schema = add_parquet_field_ids(&arrow_schema);

// After loading from catalog, remap to catalog-assigned IDs
let catalog_schema = table.metadata().current_schema();
let updated_schema = update_field_ids_to_iceberg(&writer_schema, catalog_schema)?;

// Before writing, normalize nested batch metadata
let normalized = normalize_batch_to_schema(&batch, &updated_schema)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment