Principle:Apache Hudi Flink Compactor Configuration
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Orchestrating a standalone Flink compaction job by parsing configuration, constructing the compaction pipeline DAG, and managing its lifecycle in single-run or continuous service mode.
Description
A standalone Flink compactor is an independent Flink application that runs outside the main write pipeline. It connects to a Hudi MOR table, discovers pending compaction plans on the timeline, and executes them as a Flink streaming pipeline. This separation of concerns allows the write pipeline to focus on low-latency ingestion while compaction runs with its own resource allocation and scheduling cadence.
The Flink Compactor Configuration principle covers the end-to-end orchestration of this standalone compactor:
- Configuration parsing: The compactor accepts CLI arguments (via JCommander) that control all aspects of its behavior -- the table path, compaction trigger strategy, parallelism, plan selection strategy, execution sequence, retry behavior, and whether to run in single-shot or continuous service mode.
- Pipeline construction: The compactor builds a Flink DAG with a specific topology:
- Source: A
CompactionPlanSourceFunctionthat emitsCompactionPlanEventobjects (one perCompactionOperationin each selected compaction plan). - Shuffle: A
rebalance()that distributes compaction operations evenly across parallel tasks. - Transform: A
CompactOperatorthat performs the actual file-level merge of log files into base files. - Sink: A
CompactionCommitSink(parallelism 1) that collects results from all compact tasks and commits the compaction to the timeline.
- Source: A
- Lifecycle management: The compactor supports two modes:
- Single-run mode: Execute one compaction cycle and exit. Supports retry logic via
RetryHelperfor transient failures. - Service mode: Run continuously in a loop, sleeping
minCompactionIntervalSecondsbetween cycles, managed byHoodieAsyncTableService.
- Single-run mode: Execute one compaction cycle and exit. Supports retry logic via
Usage
Use the standalone Flink compactor when:
- Resource isolation is needed: The write pipeline should not be impacted by compaction I/O. Running compaction as a separate Flink job allows independent resource allocation and scaling.
- Batch compaction is preferred: Run the compactor on a schedule (e.g., hourly via cron) in single-run mode to compact accumulated delta logs.
- Continuous background compaction: Run the compactor in service mode for always-on compaction with configurable interval between cycles.
- Operational flexibility: Use the
--scheduleflag to have the compactor both schedule and execute compaction plans, or leave it off to only execute plans already scheduled by the writer.
Theoretical Basis
The standalone compactor follows the separation of control plane and data plane pattern common in distributed data systems.
Architecture Model
WRITE PIPELINE (Data Plane - Ingestion):
Source -> Transform -> HoodieWriteSink
|
v
[Schedule compaction plan on timeline]
COMPACTOR (Data Plane - Maintenance):
[Read pending plans from timeline]
|
v
CompactionPlanSource -> rebalance -> CompactOperator(N) -> CompactionCommitSink(1)
| | |
v v v
Emit CompactionPlanEvents Merge logs into base files Commit to timeline
The Hudi timeline acts as the coordination mechanism between the two pipelines. The writer schedules compaction plans (writes them as requested instants), and the compactor discovers and executes them.
Pipeline Topology and Parallelism
The compaction pipeline has a specific parallelism model:
FUNCTION buildPipeline(plans, config):
totalOps = SUM(plan.operations.size FOR plan IN plans)
parallelism = IF config.compactionTasks == -1:
totalOps // auto: one task per operation
ELSE:
MIN(config.compactionTasks, totalOps)
SOURCE(parallelism=1):
FOR EACH (timestamp, plan) IN plans:
FOR EACH operation IN plan.operations:
EMIT CompactionPlanEvent(timestamp, operation)
TRANSFORM(parallelism=parallelism):
// rebalance distributes events round-robin across compact tasks
ON EACH event:
merged_files = COMPACT(event.operation)
EMIT CompactionCommitEvent(event.instant, merged_files)
SINK(parallelism=1, maxParallelism=1):
// single sink collects all events and commits atomically
ON EACH commitEvent:
BUFFER(commitEvent)
IF all operations for instant are complete:
COMMIT(instant)
The parallelism=1 constraint on the sink is essential for atomic commit coordination: only a single task can determine when all operations for a compaction instant are complete and safely commit to the timeline.
Retry and Failure Handling
The compactor supports two layers of fault tolerance:
- Job-level retry: In single-run mode,
RetryHelperwraps the entire compaction cycle, retrying onRuntimeExceptionup to a configurable number of times. - Stale instant recovery: When
retryLastFailedJobis enabled, the compactor detects inflight compaction instants that have exceededmaxProcessingTimeMs, rolls them back, and re-executes them.