Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Flink Compactor Configuration

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Orchestrating a standalone Flink compaction job by parsing configuration, constructing the compaction pipeline DAG, and managing its lifecycle in single-run or continuous service mode.

Description

A standalone Flink compactor is an independent Flink application that runs outside the main write pipeline. It connects to a Hudi MOR table, discovers pending compaction plans on the timeline, and executes them as a Flink streaming pipeline. This separation of concerns allows the write pipeline to focus on low-latency ingestion while compaction runs with its own resource allocation and scheduling cadence.

The Flink Compactor Configuration principle covers the end-to-end orchestration of this standalone compactor:

  1. Configuration parsing: The compactor accepts CLI arguments (via JCommander) that control all aspects of its behavior -- the table path, compaction trigger strategy, parallelism, plan selection strategy, execution sequence, retry behavior, and whether to run in single-shot or continuous service mode.
  1. Pipeline construction: The compactor builds a Flink DAG with a specific topology:
    • Source: A CompactionPlanSourceFunction that emits CompactionPlanEvent objects (one per CompactionOperation in each selected compaction plan).
    • Shuffle: A rebalance() that distributes compaction operations evenly across parallel tasks.
    • Transform: A CompactOperator that performs the actual file-level merge of log files into base files.
    • Sink: A CompactionCommitSink (parallelism 1) that collects results from all compact tasks and commits the compaction to the timeline.
  1. Lifecycle management: The compactor supports two modes:
    • Single-run mode: Execute one compaction cycle and exit. Supports retry logic via RetryHelper for transient failures.
    • Service mode: Run continuously in a loop, sleeping minCompactionIntervalSeconds between cycles, managed by HoodieAsyncTableService.

Usage

Use the standalone Flink compactor when:

  • Resource isolation is needed: The write pipeline should not be impacted by compaction I/O. Running compaction as a separate Flink job allows independent resource allocation and scaling.
  • Batch compaction is preferred: Run the compactor on a schedule (e.g., hourly via cron) in single-run mode to compact accumulated delta logs.
  • Continuous background compaction: Run the compactor in service mode for always-on compaction with configurable interval between cycles.
  • Operational flexibility: Use the --schedule flag to have the compactor both schedule and execute compaction plans, or leave it off to only execute plans already scheduled by the writer.

Theoretical Basis

The standalone compactor follows the separation of control plane and data plane pattern common in distributed data systems.

Architecture Model

WRITE PIPELINE (Data Plane - Ingestion):
  Source -> Transform -> HoodieWriteSink
                              |
                              v
                     [Schedule compaction plan on timeline]

COMPACTOR (Data Plane - Maintenance):
  [Read pending plans from timeline]
       |
       v
  CompactionPlanSource -> rebalance -> CompactOperator(N) -> CompactionCommitSink(1)
       |                                    |                         |
       v                                    v                         v
  Emit CompactionPlanEvents        Merge logs into base files   Commit to timeline

The Hudi timeline acts as the coordination mechanism between the two pipelines. The writer schedules compaction plans (writes them as requested instants), and the compactor discovers and executes them.

Pipeline Topology and Parallelism

The compaction pipeline has a specific parallelism model:

FUNCTION buildPipeline(plans, config):
  totalOps = SUM(plan.operations.size FOR plan IN plans)
  parallelism = IF config.compactionTasks == -1:
                  totalOps                    // auto: one task per operation
                ELSE:
                  MIN(config.compactionTasks, totalOps)

  SOURCE(parallelism=1):
    FOR EACH (timestamp, plan) IN plans:
      FOR EACH operation IN plan.operations:
        EMIT CompactionPlanEvent(timestamp, operation)

  TRANSFORM(parallelism=parallelism):
    // rebalance distributes events round-robin across compact tasks
    ON EACH event:
      merged_files = COMPACT(event.operation)
      EMIT CompactionCommitEvent(event.instant, merged_files)

  SINK(parallelism=1, maxParallelism=1):
    // single sink collects all events and commits atomically
    ON EACH commitEvent:
      BUFFER(commitEvent)
      IF all operations for instant are complete:
        COMMIT(instant)

The parallelism=1 constraint on the sink is essential for atomic commit coordination: only a single task can determine when all operations for a compaction instant are complete and safely commit to the timeline.

Retry and Failure Handling

The compactor supports two layers of fault tolerance:

  • Job-level retry: In single-run mode, RetryHelper wraps the entire compaction cycle, retrying on RuntimeException up to a configurable number of times.
  • Stale instant recovery: When retryLastFailedJob is enabled, the compactor detects inflight compaction instants that have exceeded maxProcessingTimeMs, rolls them back, and re-executes them.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment