Implementation:Apache Hudi HoodieFlinkCompactor Main
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for running a standalone Flink compaction job that discovers, executes, and commits compaction plans for Hudi MOR tables, provided by Apache Hudi.
Description
HoodieFlinkCompactor is a standalone Flink application with a main() entry point that orchestrates the full lifecycle of a Hudi MOR compaction job. It parses CLI arguments into a FlinkCompactionConfig, initializes a HoodieFlinkWriteClient, and then either runs a single compaction cycle or enters a continuous service loop.
The inner class AsyncCompactionService (extending HoodieAsyncTableService) contains the core compact() method that:
- Optionally schedules a new compaction plan via the write client (if
--scheduleis enabled). - Fetches the pending compaction timeline and applies the configured
CompactionPlanStrategyto select which instants to compact. - Handles stale inflight compaction instants when retry-last-failed mode is enabled.
- Rolls back any inflight compaction instants that are being re-executed.
- Retrieves the
HoodieCompactionPlanfor each selected instant and filters out empty plans. - Transitions selected instants from requested to inflight state on the timeline.
- Builds and executes a Flink streaming pipeline:
CompactionPlanSourceFunction->rebalance()->CompactOperator->CompactionCommitSink.
Usage
Run as a standalone Flink application from the command line. Typical invocations:
- Single-run compaction:
flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor hudi-flink-bundle.jar --path s3://bucket/table - Service mode: Add
--serviceflag for continuous compaction. - With scheduling: Add
--scheduleto have the compactor also schedule new compaction plans.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java - Lines: 75-117 (main method), 264-364 (compact pipeline)
Signature
// HoodieFlinkCompactor.java:75-117
public static void main(String[] args) throws Exception {
FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
// Validate configuration
if (cfg.retryLastFailedJob && cfg.maxProcessingTimeMs <= 0) {
LOG.warn("--retry-last-failed-job is enabled but --job-max-processing-time-ms is not set or <= 0. "
+ "The retry-last-failed feature will have no effect.");
}
if (cfg.serviceMode) {
AsyncCompactionService service = new AsyncCompactionService(cfg, conf);
new HoodieFlinkCompactor(service).start(true);
} else {
new RetryHelper<Void, RuntimeException>(0, cfg.retry, 0, "java.lang.RuntimeException", "Flink compaction")
.start(() -> {
AsyncCompactionService service;
try {
service = new AsyncCompactionService(cfg, conf);
} catch (Exception e) {
throw new RuntimeException("Failed to create AsyncCompactionService", e);
}
try {
new HoodieFlinkCompactor(service).start(false);
} catch (ApplicationExecutionException aee) {
if (aee.getMessage() != null && aee.getMessage().contains(NO_EXECUTE_KEYWORD)) {
LOG.info("Compaction is not performed - no work to do");
} else {
throw new RuntimeException(aee);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
service.shutDown();
}
return null;
});
}
}
// AsyncCompactionService.compact() - pipeline construction (lines 264-364)
private void compact() throws Exception {
table.getMetaClient().reloadActiveTimeline();
if (cfg.schedule) {
boolean scheduled = writeClient.scheduleCompaction(Option.empty()).isPresent();
if (!scheduled) {
LOG.info("No compaction plan for this job ");
return;
}
table.getMetaClient().reloadActiveTimeline();
}
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
List<HoodieInstant> requested = CompactionPlanStrategies.getStrategy(cfg).select(pendingCompactionTimeline);
// ... rollback inflight instants, fetch compaction plans, build pipeline ...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new CompactOperator(conf))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1)
.getTransformation()
.setMaxParallelism(1);
env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
}
Import
import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.common.util.RetryHelper;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| args | String[] |
Yes | CLI arguments parsed by JCommander into FlinkCompactionConfig.
|
| path | String (CLI --path) |
Yes | Base path of the target Hudi MOR table. |
| compactionTriggerStrategy | String (CLI --compaction-trigger-strategy) |
No | Strategy to trigger compaction: num_commits, time_elapsed, num_and_time, num_or_time. Default: num_commits.
|
| compactionDeltaCommits | Integer (CLI --compaction-delta-commits) |
No | Number of delta commits to trigger compaction. Default: 1. |
| compactionTasks | Integer (CLI --compaction-tasks) |
No | Parallelism for compact operators. -1 means auto (one per operation). Default: -1. |
| schedule | Boolean (CLI --schedule) |
No | Whether to schedule compaction plans in addition to executing them. Default: false. |
| serviceMode | Boolean (CLI --service) |
No | Whether to run continuously. Default: false. |
| compactionSeq | String (CLI --seq) |
No | Execution order: FIFO or LIFO. Default: FIFO.
|
| compactionPlanSelectStrategy | String (CLI --plan-select-strategy) |
No | Plan selection strategy: num_instants, all, instants. Default: num_instants.
|
| maxNumCompactionPlans | Integer (CLI --max-num-plans) |
No | Maximum number of plans to select per cycle. Default: 1. |
Outputs
| Name | Type | Description |
|---|---|---|
| Flink pipeline execution | Side effect | A Flink streaming pipeline is built and executed: CompactionPlanSourceFunction -> rebalance() -> CompactOperator -> CompactionCommitSink.
|
| Committed compaction instants | Side effect (timeline) | Compaction instants are transitioned from requested -> inflight -> committed on the Hudi timeline. |
| New base files | Side effect (storage) | Delta log files are merged into new base files on the underlying storage. |
Usage Examples
// Programmatic invocation of the standalone compactor
String[] args = new String[] {
"--path", "s3://my-bucket/my-hudi-table",
"--compaction-tasks", "4",
"--plan-select-strategy", "num_instants",
"--max-num-plans", "3",
"--seq", "FIFO",
"--schedule"
};
// Single-run mode (default)
HoodieFlinkCompactor.main(args);
// Service mode - add --service flag
String[] serviceArgs = new String[] {
"--path", "s3://my-bucket/my-hudi-table",
"--service",
"--min-compaction-interval-seconds", "300",
"--compaction-tasks", "8"
};
HoodieFlinkCompactor.main(serviceArgs);