Implementation:Risingwavelabs Risingwave Iceberg External Query
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Query_Processing, Iceberg |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for verifying Iceberg sink output by querying tables through external engines (Presto, Spark) provided by integration test scripts.
Description
The sink_check.py script validates that data written by RisingWave's Iceberg sink is correctly readable by external query engines. It waits for data propagation (60 seconds), executes a SQL query via the Presto CLI inside a Docker container, and asserts the output is non-empty. This serves as both a validation tool and a reference implementation for querying RisingWave-produced Iceberg tables.
Usage
Use this as a reference for how to query Iceberg tables produced by RisingWave using Presto or Spark. The pattern applies to any external engine with an Iceberg connector.
Code Reference
Source Location
- Repository: risingwave
- File: integration_tests/iceberg-sink/sink_check.py
- Lines: L1-24
Signature
# No function signature - this is a script
# Reads SQL from iceberg-query.sql file
# Executes via Presto CLI in Docker container
import subprocess
from time import sleep
sleep(60) # Wait for data propagation
query_sql = open("iceberg-query.sql").read()
subprocess.run(
["docker", "compose", "exec", "presto", "presto-cli",
"--server", "localhost:8080", "--execute", query_sql],
check=True, stdout=query_output_file
)
Import
# Run as standalone script
python3 integration_tests/iceberg-sink/sink_check.py
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| iceberg-query.sql | SQL file | Yes | Query to execute (e.g., SELECT user_id, count(*) FROM iceberg.demo_db.demo_table GROUP BY user_id) |
| Presto container | Docker service | Yes | Running Presto instance with Iceberg connector on port 8080 |
| Iceberg table | Data files | Yes | Populated Iceberg table on MinIO/S3 |
Outputs
| Name | Type | Description |
|---|---|---|
| query_output.txt | Text file | Query results from Presto |
| Assertion | Exit code | Script exits 0 if output is non-empty, fails otherwise |
Usage Examples
Query Iceberg with Presto
-- iceberg-query.sql
SELECT user_id, COUNT(*)
FROM iceberg.demo_db.demo_table
GROUP BY user_id;
Query Iceberg with Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.demo.type", "hadoop") \
.config("spark.sql.catalog.demo.warehouse", "s3a://hummock001/iceberg-data") \
.getOrCreate()
df = spark.sql("SELECT * FROM demo.demo_db.demo_table")
df.show()