Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Risingwavelabs Risingwave IcebergSink New

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Streaming, Iceberg, Rust
Last Updated 2026-02-09 07:00 GMT

Overview

Concrete tool for creating and managing Iceberg sink pipelines provided by the RisingWave Rust connector module.

Description

IcebergSink is the Rust-side implementation of the Iceberg sink connector. It is configured via IcebergConfig (parsed from SQL WITH clause properties) and manages the lifecycle of IcebergSinkWriter (writes Parquet data files) and IcebergSinkCommitter (commits snapshots to the Iceberg catalog via JNI).

Usage

This is instantiated when a user executes CREATE SINK ... WITH (connector='iceberg'). The sink supports both append-only and upsert write modes, multiple catalog types, and configurable commit intervals.

Code Reference

Source Location

  • Repository: risingwave
  • File: src/connector/src/sink/iceberg/mod.rs
  • Lines: L344-487 (IcebergConfig), L640-645 (IcebergSink), L1257-1270 (Writer), L2035-2109 (Committer)

Signature

pub struct IcebergConfig {
    pub connector: String,
    pub r#type: String,           // "upsert" or "append-only"
    pub warehouse_path: String,
    pub s3_endpoint: Option<String>,
    pub s3_access_key: Option<String>,
    pub s3_secret_key: Option<String>,
    pub s3_region: Option<String>,
    pub catalog_type: Option<String>,  // storage/hive/glue/rest/jdbc
    pub database_name: Option<String>,
    pub table_name: String,
    pub commit_checkpoint_interval: u64,  // default: 60
    // ... additional fields
}

impl IcebergSink {
    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self>;
}

Import

use risingwave_connector::sink::iceberg::{IcebergSink, IcebergConfig};

I/O Contract

Inputs

Name Type Required Description
connector String Yes Must be 'iceberg'
type String Yes 'upsert' or 'append-only'
warehouse.path String Yes S3/MinIO path for Iceberg warehouse
s3.endpoint String Yes (for MinIO) Object storage endpoint URL
s3.access.key String Yes Access key for object storage
s3.secret.key String Yes Secret key for object storage
catalog.type String No Catalog type: storage, hive, glue, rest, jdbc (default: storage)
database.name String Yes Target database name
table.name String Yes Target table name
commit_checkpoint_interval u64 No Checkpoints between commits (default: 60)

Outputs

Name Type Description
Parquet files Object storage files Data written in Parquet format to warehouse path
Iceberg metadata Catalog entries Table metadata, manifests, and snapshot files

Usage Examples

Create Iceberg Sink with MinIO

CREATE SINK iceberg_sink FROM my_materialized_view
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3://hummock001/iceberg-data',
    s3.endpoint = 'http://minio:9301',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin',
    s3.region = 'us-east-1',
    catalog.type = 'storage',
    database.name = 'demo_db',
    table.name = 'demo_table'
);

Create Upsert Iceberg Sink

CREATE SINK iceberg_upsert_sink FROM user_updates
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id',
    warehouse.path = 's3://my-bucket/iceberg',
    s3.region = 'us-west-2',
    catalog.type = 'glue',
    database.name = 'analytics',
    table.name = 'users',
    commit_checkpoint_interval = 10
);

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment