Principle:Apache Hudi Compaction Execution
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Performing the physical merge of delta log files into new base files for individual file groups as part of a Merge-On-Read compaction cycle.
Description
Compaction execution is the data-intensive core of the MOR compaction workflow. While earlier stages deal with scheduling, plan selection, and pipeline orchestration, this stage performs the actual I/O work: reading a base file and its associated delta log files, merging them according to record keys, and writing a new consolidated base file.
Each compaction operation targets a single file group -- a logical unit in Hudi consisting of one base file and zero or more delta log files that share the same file group ID. The execution stage receives a CompactionPlanEvent that identifies the target file group, its base file path, and the list of log files to merge. It then invokes the engine-specific compactor to produce a new base file containing the merged records.
Key aspects of compaction execution:
- Per-file-group granularity: Each compaction operation is independent and targets exactly one file group. This enables embarrassingly parallel execution across Flink task slots, where different tasks compact different file groups simultaneously.
- Schema evolution handling: Before executing the merge, the operator must account for schema changes that may have occurred between when the base file was written and when the log files were appended. The compaction process reconciles these schemas using an
InternalSchemaManager.
- Meta client reload coordination: When a new compaction instant begins (different from the previous one processed by this task), the operator reloads the meta client to pick up the latest timeline state, ensuring correct schema resolution and timeline consistency.
- Synchronous vs. asynchronous execution: The compaction operation can run synchronously (blocking the operator thread) or asynchronously (via a
NonThrownExecutor), with error handling that emits a failedCompactionCommitEventon exceptions rather than crashing the pipeline.
Usage
Apply this principle when:
- Understanding compaction performance: The execution stage is where I/O bottlenecks manifest. Tuning compaction parallelism (
compactionTasks) directly affects how many file groups are compacted concurrently. - Debugging compaction failures: Failed compaction operations emit a
CompactionCommitEventwith null write statuses, which triggers a rollback in the commit stage. Understanding the execution model helps diagnose whether failures are due to schema conflicts, storage issues, or resource exhaustion. - Evaluating write amplification: Each compaction operation reads one base file plus N log files and writes one new base file. The total I/O is proportional to the size of the file group, making this the primary contributor to write amplification in MOR tables.
Theoretical Basis
Compaction execution implements the merge step of a Log-Structured Merge (LSM) design adapted for data lake columnar formats.
The File Group Merge Model
A file group in a MOR table can be modeled as a layered structure:
FileGroup(id):
Layer 0: base_file (Parquet) -- full snapshot at time T0
Layer 1: log_file_1 -- delta records at time T1
Layer 2: log_file_2 -- delta records at time T2
...
Layer N: log_file_N -- delta records at time TN
Compaction merges all layers into a new Layer 0:
FUNCTION compact(fileGroup):
base_records = READ(fileGroup.baseFile)
FOR EACH logFile IN fileGroup.logFiles ORDER BY timestamp:
delta_records = READ(logFile)
FOR EACH delta IN delta_records:
IF delta.isDelete:
REMOVE base_records[delta.key]
ELSE:
UPSERT base_records[delta.key] = MERGE(base_records[delta.key], delta)
new_base_file = WRITE(base_records)
RETURN WriteStatus(new_base_file, stats)
Write Amplification Analysis
For a file group with base file size B and total log size L:
Read I/O = B + L (read base file + all log files)
Write I/O = B' (write new base file, where B' ~ B + net_new_data)
Total I/O = B + L + B'
Write Amplification Factor = Total I/O / L
= (B + L + B') / L
~ (2B + L) / L (assuming B' ~ B for small deltas)
When L is small relative to B (few small log files), write amplification is high. This motivates the compaction trigger strategies that wait for sufficient log accumulation before triggering compaction.
Parallelism Model
Since file groups are independent, compaction operations can run in parallel without coordination:
GIVEN: N compaction operations, P parallel tasks
EACH task processes ~ N/P operations sequentially
Total wall-clock time ~ (N/P) * avg_compaction_time_per_operation
The rebalance() shuffle in the Flink pipeline ensures even distribution of operations across tasks, preventing hotspots where one task receives disproportionately large file groups.