Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Compaction Execution

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Performing the physical merge of delta log files into new base files for individual file groups as part of a Merge-On-Read compaction cycle.

Description

Compaction execution is the data-intensive core of the MOR compaction workflow. While earlier stages deal with scheduling, plan selection, and pipeline orchestration, this stage performs the actual I/O work: reading a base file and its associated delta log files, merging them according to record keys, and writing a new consolidated base file.

Each compaction operation targets a single file group -- a logical unit in Hudi consisting of one base file and zero or more delta log files that share the same file group ID. The execution stage receives a CompactionPlanEvent that identifies the target file group, its base file path, and the list of log files to merge. It then invokes the engine-specific compactor to produce a new base file containing the merged records.

Key aspects of compaction execution:

  1. Per-file-group granularity: Each compaction operation is independent and targets exactly one file group. This enables embarrassingly parallel execution across Flink task slots, where different tasks compact different file groups simultaneously.
  1. Schema evolution handling: Before executing the merge, the operator must account for schema changes that may have occurred between when the base file was written and when the log files were appended. The compaction process reconciles these schemas using an InternalSchemaManager.
  1. Meta client reload coordination: When a new compaction instant begins (different from the previous one processed by this task), the operator reloads the meta client to pick up the latest timeline state, ensuring correct schema resolution and timeline consistency.
  1. Synchronous vs. asynchronous execution: The compaction operation can run synchronously (blocking the operator thread) or asynchronously (via a NonThrownExecutor), with error handling that emits a failed CompactionCommitEvent on exceptions rather than crashing the pipeline.

Usage

Apply this principle when:

  • Understanding compaction performance: The execution stage is where I/O bottlenecks manifest. Tuning compaction parallelism (compactionTasks) directly affects how many file groups are compacted concurrently.
  • Debugging compaction failures: Failed compaction operations emit a CompactionCommitEvent with null write statuses, which triggers a rollback in the commit stage. Understanding the execution model helps diagnose whether failures are due to schema conflicts, storage issues, or resource exhaustion.
  • Evaluating write amplification: Each compaction operation reads one base file plus N log files and writes one new base file. The total I/O is proportional to the size of the file group, making this the primary contributor to write amplification in MOR tables.

Theoretical Basis

Compaction execution implements the merge step of a Log-Structured Merge (LSM) design adapted for data lake columnar formats.

The File Group Merge Model

A file group in a MOR table can be modeled as a layered structure:

FileGroup(id):
  Layer 0: base_file (Parquet)    -- full snapshot at time T0
  Layer 1: log_file_1             -- delta records at time T1
  Layer 2: log_file_2             -- delta records at time T2
  ...
  Layer N: log_file_N             -- delta records at time TN

Compaction merges all layers into a new Layer 0:

FUNCTION compact(fileGroup):
  base_records = READ(fileGroup.baseFile)
  FOR EACH logFile IN fileGroup.logFiles ORDER BY timestamp:
    delta_records = READ(logFile)
    FOR EACH delta IN delta_records:
      IF delta.isDelete:
        REMOVE base_records[delta.key]
      ELSE:
        UPSERT base_records[delta.key] = MERGE(base_records[delta.key], delta)
  new_base_file = WRITE(base_records)
  RETURN WriteStatus(new_base_file, stats)

Write Amplification Analysis

For a file group with base file size B and total log size L:

Read I/O  = B + L          (read base file + all log files)
Write I/O = B'             (write new base file, where B' ~ B + net_new_data)
Total I/O = B + L + B'

Write Amplification Factor = Total I/O / L
                           = (B + L + B') / L
                           ~ (2B + L) / L    (assuming B' ~ B for small deltas)

When L is small relative to B (few small log files), write amplification is high. This motivates the compaction trigger strategies that wait for sufficient log accumulation before triggering compaction.

Parallelism Model

Since file groups are independent, compaction operations can run in parallel without coordination:

GIVEN: N compaction operations, P parallel tasks
EACH task processes ~ N/P operations sequentially
Total wall-clock time ~ (N/P) * avg_compaction_time_per_operation

The rebalance() shuffle in the Flink pipeline ensures even distribution of operations across tasks, preventing hotspots where one task receives disproportionately large file groups.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment