Implementation:Apache Hudi CompactionCommitSink Invoke
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for collecting compaction results from parallel tasks and committing the compaction instant atomically to the Hudi timeline, provided by Apache Hudi.
Description
This implementation consists of two cooperating classes:
CompactionCommitSink-- A FlinkSinkFunction(extendingCleanFunction) that runs at parallelism=1. Itsinvoke()method receivesCompactionCommitEventobjects from all parallelCompactOperatortasks. It logs warnings for failed or error-containing events, then delegates to the appropriateCompactCommitHandler(for data table or metadata table compactions).
CompactCommitHandler-- The core commit logic. It maintains acommitBuffer(aMap<String, Map<String, CompactionCommitEvent>>keyed by instant time and file ID) and acompactionPlanCache. WhencommitIfNecessary()is called:- It adds the event to the commit buffer.
- It retrieves the compaction plan for the instant (using the cache or reading from the timeline via
CompactionUtils.getCompactionPlan()). - It checks if the buffer size equals the plan's operation count (completeness check).
- If any event is marked as failed, it rolls back the compaction via
CompactionUtil.rollbackCompaction()and records a rollback metric. - If all events succeeded, it aggregates
WriteStatusobjects, checks error record counts, createsHoodieCommitMetadataviaCompactHelpers.getInstance().createCompactionMetadata(), and callswriteClient.completeCompaction(). - If async cleaning is disabled, it triggers
writeClient.clean()for synchronous cleanup. - Finally, it resets the commit buffer and plan cache for the completed instant.
The CompactionCommitSink also inherits cleaning ability from CleanFunction, which is needed because the Flink SQL API does not allow multiple sinks in one table sink provider.
Usage
This sink is placed at the end of the compaction Flink pipeline at parallelism=1 (with maxParallelism=1 to prevent rescaling). It is the final stage that makes compaction results visible to readers.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java - Lines: 79-93
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java - Lines: 105-141
Signature
// CompactionCommitSink.java:79-93
@Override
public void invoke(CompactionCommitEvent event, Context context) throws Exception {
final String instant = event.getInstant();
if (event.isFailed()
|| (event.getWriteStatuses() != null
&& event.getWriteStatuses().stream().anyMatch(writeStatus -> writeStatus.getTotalErrorRecords() > 0))) {
log.warn("Received abnormal CompactionCommitEvent of instant {}, task ID is {},"
+ " is failed: {}, error record count: {}",
instant, event.getTaskID(), event.isFailed(), getNumErrorRecords(event));
}
if (event.isMetadataTable()) {
mdtCompactCommitHandler.get().commitIfNecessary(event, compactionMetrics);
} else {
compactCommitHandler.get().commitIfNecessary(event, compactionMetrics);
}
}
// CompactCommitHandler.java:105-141
public void commitIfNecessary(CompactionCommitEvent event, FlinkCompactionMetrics compactionMetrics) {
String instant = event.getInstant();
commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
.put(event.getFileId(), event);
boolean isLogCompaction = event.isLogCompaction();
HoodieCompactionPlan compactionPlan = getCompactionPlan(instant, isLogCompaction);
Collection<CompactionCommitEvent> events = commitBuffer.get(instant).values();
boolean isReady = compactionPlan.getOperations().size() == events.size();
if (!isReady) {
return;
}
if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
try {
rollbackCompaction(instant, isLogCompaction);
} finally {
reset(instant);
compactionMetrics.markCompactionRolledBack();
}
return;
}
try {
doCommit(instant, isLogCompaction, events, compactionMetrics);
} catch (Throwable throwable) {
log.error("Error while committing compaction instant: {}", instant, throwable);
compactionMetrics.markCompactionRolledBack();
} finally {
reset(instant);
}
}
Import
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.handler.CompactCommitHandler;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.sink.CleanFunction;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| event | CompactionCommitEvent |
Yes | A commit event from a compaction task containing: instant (String, the compaction instant time), fileId (String, the file group ID), writeStatuses (List<WriteStatus>, null if failed), taskID (int), isMetadataTable (boolean), isLogCompaction (boolean).
|
| context | SinkFunction.Context |
Yes | Flink sink function context providing timestamp and watermark information. |
| compactionPlan | HoodieCompactionPlan |
Yes (loaded internally) | The compaction plan for the instant, loaded from the Hudi timeline via CompactionUtils.getCompactionPlan(). Contains the list of CompactionOperation entries that must all be completed before commit.
|
Outputs
| Name | Type | Description |
|---|---|---|
| Committed compaction instant | Side effect (timeline) | On success: the compaction instant transitions from inflight to committed on the Hudi timeline, with HoodieCommitMetadata recording all file-level changes.
|
| Rolled-back compaction instant | Side effect (timeline) | On failure: the compaction instant is rolled back, removing it from the active timeline. Orphaned base files are cleaned up by subsequent clean operations. |
| Clean operation | Side effect (optional) | If FlinkOptions.CLEAN_ASYNC_ENABLED is false, a synchronous writeClient.clean() is triggered after successful commit to remove old file versions.
|
| Compaction metrics | Side effect (metrics) | Metrics are updated via FlinkCompactionMetrics: markCompactionCompleted() on success, markCompactionRolledBack() on failure, and updateCommitMetrics() with commit metadata.
|
Usage Examples
// CompactionCommitSink is placed at the end of the compaction pipeline:
env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new CompactOperator(conf))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf)) // <-- this sink
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1)
.getTransformation()
.setMaxParallelism(1); // Must be 1 for correct commit coordination
// The sink receives CompactionCommitEvents from all CompactOperator tasks.
// Example flow for a compaction plan with 3 operations:
// Task 0 emits: CompactionCommitEvent(instant="20240101120000", fileId="fg-1", statuses=[...], taskID=0)
// Task 1 emits: CompactionCommitEvent(instant="20240101120000", fileId="fg-2", statuses=[...], taskID=1)
// Task 2 emits: CompactionCommitEvent(instant="20240101120000", fileId="fg-3", statuses=[...], taskID=2)
// After receiving all 3 events, commitIfNecessary() detects completeness and commits.