Implementation:ArroyoSystems Arroyo Iceberg Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, File_Systems |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Implements the IcebergConnector struct that provides the top-level Arroyo connector interface for writing streaming data into Apache Iceberg tables via REST catalogs.
Description
The IcebergConnector implements the Connector trait, binding an IcebergProfile (catalog credentials and endpoint configuration) together with an IcebergTable (namespace, table name, partitioning, and rolling policies) to produce a fully configured sink operator. It validates that the output format is Parquet, tests connectivity to the REST catalog (including namespace listing and authentication checks), converts Arrow schemas to Iceberg schemas with Parquet field IDs, and registers Iceberg partition-transform UDFs (identity, day, month, year, hour, truncate, bucket). The make_operator method constructs a FileSystemSink operator backed by the Iceberg table format.
Usage
Use this connector when you need to write streaming SQL results from Arroyo directly into an Apache Iceberg table managed by a REST catalog. It is selected by specifying the iceberg connector type in a CREATE TABLE statement with a Parquet format.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/filesystem/iceberg.rs
- Lines: 1-315
Signature
pub struct IcebergConnector {}
impl IcebergConnector {
fn client(profile: &IcebergProfile) -> anyhow::Result<Client>;
async fn validate(
profile: &IcebergProfile,
table: &IcebergTable,
schema: ConnectionSchema,
) -> anyhow::Result<()>;
}
impl Connector for IcebergConnector {
type ProfileT = IcebergProfile;
type TableT = IcebergTable;
fn name(&self) -> &'static str;
fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
fn test(&self, ...);
fn table_type(&self, ...) -> ConnectionType;
fn from_config(&self, ...) -> anyhow::Result<Connection>;
fn from_options(&self, ...) -> anyhow::Result<Connection>;
fn register_udfs(&self, registry: &mut dyn FunctionRegistry) -> anyhow::Result<()>;
fn make_operator(&self, ...) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_connectors::filesystem::iceberg::IcebergConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| profile | IcebergProfile | Yes | REST catalog URL, warehouse, bearer token |
| table | IcebergTable | Yes | Namespace, table name, partitioning, rolling policy, file naming |
| schema | ConnectionSchema | Yes | Arrow schema for output columns; must use Parquet format |
Outputs
| Name | Type | Description |
|---|---|---|
| Connection | Connection | Fully configured sink connection with operator config and partition expressions |
| ConstructedOperator | ConstructedOperator | A FileSystemSink operator writing Parquet to an Iceberg table via REST catalog |
Usage Examples
// Register the connector and create a connection
let connector = IcebergConnector {};
let connection = connector.from_config(
None,
"my_iceberg_sink",
profile,
IcebergTable::Sink(sink_config),
Some(&schema),
)?;
// The connector also registers Iceberg partition UDFs
connector.register_udfs(&mut registry)?;