Implementation:Risingwavelabs Risingwave Spark Iceberg Compaction
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Storage_Optimization, Iceberg |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for compacting Iceberg table files and cleaning up metadata provided by PySpark scripts and Airflow DAGs.
Description
The Spark Iceberg Compaction scripts use PySpark to invoke Iceberg's built-in maintenance procedures: rewrite_data_files (compacts small files), rewrite_manifests (optimizes metadata files), expire_snapshots (removes old snapshots), and remove_orphan_files (cleans up unreferenced files). These are provided as both standalone PySpark scripts and Airflow DAGs for scheduled execution.
Usage
Run these scripts periodically (hourly for compaction, daily for cleanup) to maintain Iceberg table performance when using RisingWave's streaming Iceberg sink.
Code Reference
Source Location
- Repository: risingwave
- File: integration_tests/iceberg-sink/iceberg-compaction-sql/rewrite_small_files.py (L1-16), integration_tests/iceberg-sink/iceberg-compaction-sql/remove_orphan_files.py (L4-22)
- File: integration_tests/iceberg-sink/airflow_dags/rewrite_iceberg_small_files.py (L1-25), integration_tests/iceberg-sink/airflow_dags/remove_iceberg_orphan_files.py (L1-24)
Signature
# rewrite_small_files.py
from remove_orphan_files import get_spark
spark = get_spark()
spark.sql("CALL demo.system.rewrite_data_files('demo.demo_db.demo_table')").show()
spark.sql("CALL demo.system.rewrite_manifests('demo.demo_db.demo_table')").show()
spark.stop()
# remove_orphan_files.py
spark.sql("CALL demo.system.expire_snapshots('demo.demo_db.demo_table')").show()
spark.sql("CALL demo.system.remove_orphan_files('demo.demo_db.demo_table')").show()
Import
# Run via spark-submit
spark-submit --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0 \
rewrite_small_files.py
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| Table name | String | Yes | Iceberg table identifier (e.g., demo.demo_db.demo_table) |
| Spark catalog config | Configuration | Yes | Catalog type, warehouse path, S3 endpoint |
| num_rewrite_manifest_retries | int | No | Retries for manifest rewrite (default: 3) |
Outputs
| Name | Type | Description |
|---|---|---|
| Compacted files | Parquet files | Fewer, larger data files replacing small ones |
| Cleaned metadata | Iceberg metadata | Removed orphan files, expired snapshots |
Usage Examples
Standalone PySpark Compaction
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.demo.type", "hadoop") \
.config("spark.sql.catalog.demo.warehouse", "s3a://hummock001/iceberg-data") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio-0:9301") \
.config("spark.hadoop.fs.s3a.access.key", "hummockadmin") \
.config("spark.hadoop.fs.s3a.secret.key", "hummockadmin") \
.getOrCreate()
# Compact small files
spark.sql("CALL demo.system.rewrite_data_files('demo.demo_db.demo_table')")
# Optimize manifests
spark.sql("CALL demo.system.rewrite_manifests('demo.demo_db.demo_table')")
# Clean up
spark.sql("CALL demo.system.expire_snapshots('demo.demo_db.demo_table')")
spark.sql("CALL demo.system.remove_orphan_files('demo.demo_db.demo_table')")
spark.stop()
Airflow DAG (Hourly Compaction)
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
dag = DAG('rewrite_iceberg_small_files', schedule_interval='@hourly')
rewrite_task = SparkSubmitOperator(
task_id='rewrite_data_files',
application='rewrite_small_files.py',
packages='org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0',
dag=dag,
)