Implementation:Risingwavelabs Risingwave IcebergSink New
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Streaming, Iceberg, Rust |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for creating and managing Iceberg sink pipelines provided by the RisingWave Rust connector module.
Description
IcebergSink is the Rust-side implementation of the Iceberg sink connector. It is configured via IcebergConfig (parsed from SQL WITH clause properties) and manages the lifecycle of IcebergSinkWriter (writes Parquet data files) and IcebergSinkCommitter (commits snapshots to the Iceberg catalog via JNI).
Usage
This is instantiated when a user executes CREATE SINK ... WITH (connector='iceberg'). The sink supports both append-only and upsert write modes, multiple catalog types, and configurable commit intervals.
Code Reference
Source Location
- Repository: risingwave
- File: src/connector/src/sink/iceberg/mod.rs
- Lines: L344-487 (IcebergConfig), L640-645 (IcebergSink), L1257-1270 (Writer), L2035-2109 (Committer)
Signature
pub struct IcebergConfig {
pub connector: String,
pub r#type: String, // "upsert" or "append-only"
pub warehouse_path: String,
pub s3_endpoint: Option<String>,
pub s3_access_key: Option<String>,
pub s3_secret_key: Option<String>,
pub s3_region: Option<String>,
pub catalog_type: Option<String>, // storage/hive/glue/rest/jdbc
pub database_name: Option<String>,
pub table_name: String,
pub commit_checkpoint_interval: u64, // default: 60
// ... additional fields
}
impl IcebergSink {
pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self>;
}
Import
use risingwave_connector::sink::iceberg::{IcebergSink, IcebergConfig};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| connector | String | Yes | Must be 'iceberg' |
| type | String | Yes | 'upsert' or 'append-only' |
| warehouse.path | String | Yes | S3/MinIO path for Iceberg warehouse |
| s3.endpoint | String | Yes (for MinIO) | Object storage endpoint URL |
| s3.access.key | String | Yes | Access key for object storage |
| s3.secret.key | String | Yes | Secret key for object storage |
| catalog.type | String | No | Catalog type: storage, hive, glue, rest, jdbc (default: storage) |
| database.name | String | Yes | Target database name |
| table.name | String | Yes | Target table name |
| commit_checkpoint_interval | u64 | No | Checkpoints between commits (default: 60) |
Outputs
| Name | Type | Description |
|---|---|---|
| Parquet files | Object storage files | Data written in Parquet format to warehouse path |
| Iceberg metadata | Catalog entries | Table metadata, manifests, and snapshot files |
Usage Examples
Create Iceberg Sink with MinIO
CREATE SINK iceberg_sink FROM my_materialized_view
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name = 'demo_db',
table.name = 'demo_table'
);
Create Upsert Iceberg Sink
CREATE SINK iceberg_upsert_sink FROM user_updates
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id',
warehouse.path = 's3://my-bucket/iceberg',
s3.region = 'us-west-2',
catalog.type = 'glue',
database.name = 'analytics',
table.name = 'users',
commit_checkpoint_interval = 10
);