Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Iceberg Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, File_Systems
Last Updated 2026-02-08 08:00 GMT

Overview

Implements the IcebergConnector struct that provides the top-level Arroyo connector interface for writing streaming data into Apache Iceberg tables via REST catalogs.

Description

The IcebergConnector implements the Connector trait, binding an IcebergProfile (catalog credentials and endpoint configuration) together with an IcebergTable (namespace, table name, partitioning, and rolling policies) to produce a fully configured sink operator. It validates that the output format is Parquet, tests connectivity to the REST catalog (including namespace listing and authentication checks), converts Arrow schemas to Iceberg schemas with Parquet field IDs, and registers Iceberg partition-transform UDFs (identity, day, month, year, hour, truncate, bucket). The make_operator method constructs a FileSystemSink operator backed by the Iceberg table format.

Usage

Use this connector when you need to write streaming SQL results from Arroyo directly into an Apache Iceberg table managed by a REST catalog. It is selected by specifying the iceberg connector type in a CREATE TABLE statement with a Parquet format.

Code Reference

Source Location

Signature

pub struct IcebergConnector {}

impl IcebergConnector {
    fn client(profile: &IcebergProfile) -> anyhow::Result<Client>;
    async fn validate(
        profile: &IcebergProfile,
        table: &IcebergTable,
        schema: ConnectionSchema,
    ) -> anyhow::Result<()>;
}

impl Connector for IcebergConnector {
    type ProfileT = IcebergProfile;
    type TableT = IcebergTable;

    fn name(&self) -> &'static str;
    fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;
    fn test(&self, ...);
    fn table_type(&self, ...) -> ConnectionType;
    fn from_config(&self, ...) -> anyhow::Result<Connection>;
    fn from_options(&self, ...) -> anyhow::Result<Connection>;
    fn register_udfs(&self, registry: &mut dyn FunctionRegistry) -> anyhow::Result<()>;
    fn make_operator(&self, ...) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_connectors::filesystem::iceberg::IcebergConnector;

I/O Contract

Inputs

Name Type Required Description
profile IcebergProfile Yes REST catalog URL, warehouse, bearer token
table IcebergTable Yes Namespace, table name, partitioning, rolling policy, file naming
schema ConnectionSchema Yes Arrow schema for output columns; must use Parquet format

Outputs

Name Type Description
Connection Connection Fully configured sink connection with operator config and partition expressions
ConstructedOperator ConstructedOperator A FileSystemSink operator writing Parquet to an Iceberg table via REST catalog

Usage Examples

// Register the connector and create a connection
let connector = IcebergConnector {};
let connection = connector.from_config(
    None,
    "my_iceberg_sink",
    profile,
    IcebergTable::Sink(sink_config),
    Some(&schema),
)?;

// The connector also registers Iceberg partition UDFs
connector.register_udfs(&mut registry)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment