Implementation:ArroyoSystems Arroyo Iceberg Sink
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Manages the lifecycle of an Iceberg table within the Arroyo filesystem sink, including REST catalog interaction, table creation, schema management, storage provider construction, and transactional commits with idempotency.
Description
The IcebergTable struct is the runtime representation of an Iceberg table backed by a REST catalog. It wraps the RestCatalog client from iceberg-catalog-rest and maintains the table identity, storage options, partitioning configuration, and cached table metadata.
Key capabilities:
- new - Constructs the REST catalog client from an IcebergCatalog configuration, mapping S3 credential keys to the Iceberg REST catalog property names and building the TableIdent from dot-separated namespace and table name.
- load_or_create - Creates the namespace and table if they do not exist (handling concurrent creation races), converting the Arrow schema to Iceberg schema with partition specs.
- get_storage_provider - Derives the storage provider (with S3 credentials) from the catalog-returned table metadata, pointing at the
{table_location}/data/path. - commit - Performs a transactional fast-append of data files to the table. Uses a deterministic transaction ID derived from the job ID, operator ID, epoch number, and table UUID via SHA-256 to achieve exactly-once commit semantics. If the current snapshot already contains the expected transaction ID, the commit is skipped (idempotent recovery).
The module also provides map_iceberg_error to classify Iceberg errors into DataflowError categories with appropriate retry hints.
Usage
Used internally by the filesystem sink operator when the table format is Iceberg. The IcebergTable is created during operator construction and held in the CommitState::Iceberg variant throughout the sink lifecycle.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/sink/iceberg/mod.rs
- Lines: 1-351
Signature
#[derive(Debug)]
pub struct IcebergTable {
pub task_info: Option<Arc<TaskInfo>>,
pub catalog: RestCatalog,
pub storage_options: HashMap<String, String>,
pub table_ident: TableIdent,
pub location_path: Option<String>,
pub table: Option<Table>,
pub manifest_files: Vec<ManifestFile>,
pub partitioning: IcebergPartitioning,
}
impl IcebergTable {
pub fn new(catalog: &IcebergCatalog, sink: &IcebergSink) -> anyhow::Result<Self>;
pub async fn load_or_create(
&mut self,
task_info: Arc<TaskInfo>,
schema: &Schema,
) -> Result<&Table, DataflowError>;
pub async fn get_storage_provider(
&mut self,
task_info: Arc<TaskInfo>,
schema: &Schema,
) -> Result<StorageProvider, DataflowError>;
pub async fn commit(
&mut self,
epoch: u32,
finished_files: &[FinishedFile],
) -> anyhow::Result<()>;
}
pub fn to_hex(bytes: &[u8]) -> String;
Import
use arroyo_connectors::filesystem::sink::iceberg::IcebergTable;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| catalog | IcebergCatalog | Yes | REST catalog configuration (URL, warehouse, token) |
| sink | IcebergSink | Yes | Namespace, table name, storage options, partitioning |
| task_info | Arc<TaskInfo> | Yes | Job and operator identity for idempotent commit IDs |
| schema | Arrow Schema | Yes | Arrow schema to convert to Iceberg schema on table creation |
| epoch | u32 | Yes | Checkpoint epoch for transactional commit tracking |
| finished_files | &[FinishedFile] | Yes | Files with metadata to commit as DataFile entries |
Outputs
| Name | Type | Description |
|---|---|---|
| Table | iceberg::table::Table | Loaded or created Iceberg table with current metadata |
| StorageProvider | arroyo_storage::StorageProvider | Configured object store client for the table data directory |
Usage Examples
// Create and initialize an Iceberg table
let mut iceberg_table = IcebergTable::new(&catalog_config, &sink_config)?;
let table = iceberg_table.load_or_create(task_info, &arrow_schema).await?;
// Get storage provider for writing data files
let provider = iceberg_table.get_storage_provider(task_info, &schema).await?;
// Commit finished files for an epoch
iceberg_table.commit(epoch, &finished_files).await?;