Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Iceberg Sink

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Manages the lifecycle of an Iceberg table within the Arroyo filesystem sink, including REST catalog interaction, table creation, schema management, storage provider construction, and transactional commits with idempotency.

Description

The IcebergTable struct is the runtime representation of an Iceberg table backed by a REST catalog. It wraps the RestCatalog client from iceberg-catalog-rest and maintains the table identity, storage options, partitioning configuration, and cached table metadata.

Key capabilities:

  • new - Constructs the REST catalog client from an IcebergCatalog configuration, mapping S3 credential keys to the Iceberg REST catalog property names and building the TableIdent from dot-separated namespace and table name.
  • load_or_create - Creates the namespace and table if they do not exist (handling concurrent creation races), converting the Arrow schema to Iceberg schema with partition specs.
  • get_storage_provider - Derives the storage provider (with S3 credentials) from the catalog-returned table metadata, pointing at the {table_location}/data/ path.
  • commit - Performs a transactional fast-append of data files to the table. Uses a deterministic transaction ID derived from the job ID, operator ID, epoch number, and table UUID via SHA-256 to achieve exactly-once commit semantics. If the current snapshot already contains the expected transaction ID, the commit is skipped (idempotent recovery).

The module also provides map_iceberg_error to classify Iceberg errors into DataflowError categories with appropriate retry hints.

Usage

Used internally by the filesystem sink operator when the table format is Iceberg. The IcebergTable is created during operator construction and held in the CommitState::Iceberg variant throughout the sink lifecycle.

Code Reference

Source Location

Signature

#[derive(Debug)]
pub struct IcebergTable {
    pub task_info: Option<Arc<TaskInfo>>,
    pub catalog: RestCatalog,
    pub storage_options: HashMap<String, String>,
    pub table_ident: TableIdent,
    pub location_path: Option<String>,
    pub table: Option<Table>,
    pub manifest_files: Vec<ManifestFile>,
    pub partitioning: IcebergPartitioning,
}

impl IcebergTable {
    pub fn new(catalog: &IcebergCatalog, sink: &IcebergSink) -> anyhow::Result<Self>;

    pub async fn load_or_create(
        &mut self,
        task_info: Arc<TaskInfo>,
        schema: &Schema,
    ) -> Result<&Table, DataflowError>;

    pub async fn get_storage_provider(
        &mut self,
        task_info: Arc<TaskInfo>,
        schema: &Schema,
    ) -> Result<StorageProvider, DataflowError>;

    pub async fn commit(
        &mut self,
        epoch: u32,
        finished_files: &[FinishedFile],
    ) -> anyhow::Result<()>;
}

pub fn to_hex(bytes: &[u8]) -> String;

Import

use arroyo_connectors::filesystem::sink::iceberg::IcebergTable;

I/O Contract

Inputs

Name Type Required Description
catalog IcebergCatalog Yes REST catalog configuration (URL, warehouse, token)
sink IcebergSink Yes Namespace, table name, storage options, partitioning
task_info Arc<TaskInfo> Yes Job and operator identity for idempotent commit IDs
schema Arrow Schema Yes Arrow schema to convert to Iceberg schema on table creation
epoch u32 Yes Checkpoint epoch for transactional commit tracking
finished_files &[FinishedFile] Yes Files with metadata to commit as DataFile entries

Outputs

Name Type Description
Table iceberg::table::Table Loaded or created Iceberg table with current metadata
StorageProvider arroyo_storage::StorageProvider Configured object store client for the table data directory

Usage Examples

// Create and initialize an Iceberg table
let mut iceberg_table = IcebergTable::new(&catalog_config, &sink_config)?;
let table = iceberg_table.load_or_create(task_info, &arrow_schema).await?;

// Get storage provider for writing data files
let provider = iceberg_table.get_storage_provider(task_info, &schema).await?;

// Commit finished files for an epoch
iceberg_table.commit(epoch, &finished_files).await?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment