Principle:Apache Hudi Clustering Commit
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Collecting all clustering execution results, validating completeness against the original plan, and atomically committing the file replacements to the Hudi timeline.
Description
The clustering commit phase is the final step in the clustering workflow. It acts as a barrier and validator: it must receive successful results from every clustering group in the plan before it can commit the overall operation. This ensures that partial clustering (where some groups succeed and others fail) does not corrupt the table state.
The commit process follows these steps:
- Event collection: Each
ClusteringCommitEventfrom a parallel clustering operator task is buffered in a map keyed by instant time and file IDs. The map structure deduplicates events in case of task retries. - Completeness check: The sink compares the number of received events against the number of input groups in the original
HoodieClusteringPlan. The commit only proceeds when all groups have reported. - Failure handling: If any event indicates failure (null write statuses or error records above threshold), the entire clustering instant is rolled back via
ClusteringUtil.rollbackClustering. - Validation: The
validateWriteResultmethod verifies that at least one new file was produced. This guards against silent data loss. - Metadata construction: A
HoodieCommitMetadatais built withWriteOperationType.CLUSTER, containing write statistics, the partition-to-replaced-file-IDs mapping, and the table schema. - Timeline commit: The metadata is committed as a
REPLACE_COMMITon the Hudi timeline, atomically replacing old file groups with new ones. - Optional cleaning: If inline cleaning is enabled (async cleaning disabled), the commit sink triggers a clean operation to remove obsolete data files.
Usage
This principle applies whenever clustering execution completes, in both streaming and batch modes. The commit sink must run with parallelism 1 to ensure a single coordinator collects all events and makes the atomic commit decision.
The commit sink also inherits cleaning capabilities from CleanFunction, allowing it to perform inline cleaning after a successful clustering commit. This is particularly important for SQL API users where multiple sinks per table are not supported.
Theoretical Basis
The clustering commit implements a two-phase commit protocol adapted for Flink's dataflow model:
FUNCTION commitClustering(instant, events, clusteringPlan):
// Phase 1: Validate completeness
IF events.size() != clusteringPlan.inputGroups.size():
RETURN // not all groups have reported yet
IF any event in events is failed:
ROLLBACK(instant)
RETURN
// Phase 2: Validate results
allStatuses = events.flatMap(e -> e.writeStatuses)
errorRecords = sum(allStatuses.map(s -> s.totalErrorRecords))
IF errorRecords > 0 AND NOT ignoreFailed:
ROLLBACK(instant)
RETURN
IF allStatuses.isEmpty():
THROW ClusteringException("0 WriteStatus produced")
// Phase 3: Build commit metadata
newFileGroups = allStatuses.map(s -> (s.partition, s.fileId))
oldFileGroups = clusteringPlan.getFileGroups()
replacedFileIds = oldFileGroups.filter(fg -> fg NOT IN newFileGroups)
metadata = buildCommitMetadata(
writeStats = allStatuses.map(s -> s.stat),
partitionToReplaceFileIds = replacedFileIds.groupBy(partition),
operationType = CLUSTER
)
// Phase 4: Atomic commit
timeline.commitReplaceCommit(instant, metadata)
// Phase 5: Optional cleanup
IF NOT asyncCleanEnabled:
writeClient.clean()
The partitionToReplaceFileIds mapping is central to the atomicity guarantee. It tells the timeline which old file groups are being replaced by the new files. After the commit, any reader opening the table will see the new files and skip the old ones, without any intermediate state being visible.
The barrier semantics (waiting for all groups) ensure that either all file groups in the plan are replaced or none are. This prevents scenarios where a reader might see some old files and some new files for the same logical data.