Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave Spark Iceberg Compaction

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Storage_Optimization, Iceberg
Last Updated 2026-02-09 07:00 GMT

Overview

Concrete tool for compacting Iceberg table files and cleaning up metadata provided by PySpark scripts and Airflow DAGs.

Description

The Spark Iceberg Compaction scripts use PySpark to invoke Iceberg's built-in maintenance procedures: rewrite_data_files (compacts small files), rewrite_manifests (optimizes metadata files), expire_snapshots (removes old snapshots), and remove_orphan_files (cleans up unreferenced files). These are provided as both standalone PySpark scripts and Airflow DAGs for scheduled execution.

Usage

Run these scripts periodically (hourly for compaction, daily for cleanup) to maintain Iceberg table performance when using RisingWave's streaming Iceberg sink.

Code Reference

Source Location

  • Repository: risingwave
  • File: integration_tests/iceberg-sink/iceberg-compaction-sql/rewrite_small_files.py (L1-16), integration_tests/iceberg-sink/iceberg-compaction-sql/remove_orphan_files.py (L4-22)
  • File: integration_tests/iceberg-sink/airflow_dags/rewrite_iceberg_small_files.py (L1-25), integration_tests/iceberg-sink/airflow_dags/remove_iceberg_orphan_files.py (L1-24)

Signature

# rewrite_small_files.py
from remove_orphan_files import get_spark

spark = get_spark()
spark.sql("CALL demo.system.rewrite_data_files('demo.demo_db.demo_table')").show()
spark.sql("CALL demo.system.rewrite_manifests('demo.demo_db.demo_table')").show()
spark.stop()

# remove_orphan_files.py
spark.sql("CALL demo.system.expire_snapshots('demo.demo_db.demo_table')").show()
spark.sql("CALL demo.system.remove_orphan_files('demo.demo_db.demo_table')").show()

Import

# Run via spark-submit
spark-submit --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0 \
    rewrite_small_files.py

I/O Contract

Inputs

Name Type Required Description
Table name String Yes Iceberg table identifier (e.g., demo.demo_db.demo_table)
Spark catalog config Configuration Yes Catalog type, warehouse path, S3 endpoint
num_rewrite_manifest_retries int No Retries for manifest rewrite (default: 3)

Outputs

Name Type Description
Compacted files Parquet files Fewer, larger data files replacing small ones
Cleaned metadata Iceberg metadata Removed orphan files, expired snapshots

Usage Examples

Standalone PySpark Compaction

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.demo.type", "hadoop") \
    .config("spark.sql.catalog.demo.warehouse", "s3a://hummock001/iceberg-data") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio-0:9301") \
    .config("spark.hadoop.fs.s3a.access.key", "hummockadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "hummockadmin") \
    .getOrCreate()

# Compact small files
spark.sql("CALL demo.system.rewrite_data_files('demo.demo_db.demo_table')")

# Optimize manifests
spark.sql("CALL demo.system.rewrite_manifests('demo.demo_db.demo_table')")

# Clean up
spark.sql("CALL demo.system.expire_snapshots('demo.demo_db.demo_table')")
spark.sql("CALL demo.system.remove_orphan_files('demo.demo_db.demo_table')")

spark.stop()

Airflow DAG (Hourly Compaction)

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

dag = DAG('rewrite_iceberg_small_files', schedule_interval='@hourly')

rewrite_task = SparkSubmitOperator(
    task_id='rewrite_data_files',
    application='rewrite_small_files.py',
    packages='org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0',
    dag=dag,
)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment