Implementation:Apache Hudi ClusteringCommitSink DoCommit
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for collecting clustering execution results, validating completeness, and committing the REPLACE_COMMIT to the Hudi timeline, provided by Apache Hudi.
Description
ClusteringCommitSink is a Flink SinkFunction that extends CleanFunction. It serves as the commit coordinator for the clustering workflow, collecting ClusteringCommitEvent objects from all parallel ClusteringOperator tasks and committing the results when all groups in the plan have reported.
The invoke method is called for each incoming event. It buffers events in a map keyed by (instantTime, fileIds) to handle deduplication from task retries. After buffering, it calls commitIfNecessary to check if all groups have reported.
The doCommit method performs the actual commit:
- Aggregate write statuses: Collects all
WriteStatusobjects from the buffered events. - Error check: If the total error record count exceeds zero and
ignore.failedis false, rolls back the clustering instant. - Build write metadata: Creates
HoodieWriteMetadatawith write statuses, write stats, and the partition-to-replaced-file-IDs mapping. - Validate: Calls
validateWriteResultto ensure at least one new file was produced. ThrowsHoodieClusteringExceptionif zero write statuses exist. - Build commit metadata: Constructs
HoodieCommitMetadatawith operation typeCLUSTERand actionREPLACE_COMMIT. - Commit: Calls
writeClient.completeTableService(TableServiceType.CLUSTER, ...)to finalize the commit on the timeline. - Cleanup: If async cleaning is disabled, triggers inline
writeClient.clean().
The getPartitionToReplacedFileIds method computes which old file groups are being replaced by filtering out file group IDs that appear in the new write stats (since those were newly created).
Usage
Use ClusteringCommitSink as the terminal operator in any Flink clustering pipeline. It must run with parallelism 1 (and max parallelism 1) to ensure a single coordinator collects all events. The sink inherits cleaning capability from CleanFunction, making it suitable for SQL API use cases where multiple sinks are not allowed.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java - Lines: 67-263 (class), 111-123 (invoke), 185-227 (doCommit), 239-246 (validateWriteResult)
Signature
public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
public ClusteringCommitSink(Configuration conf)
@Override
public void invoke(ClusteringCommitEvent event, Context context) throws Exception
private void doCommit(
String instant,
HoodieClusteringPlan clusteringPlan,
Collection<ClusteringCommitEvent> events)
private static void validateWriteResult(
HoodieClusteringPlan clusteringPlan,
String instantTime,
HoodieWriteMetadata<List<WriteStatus>> writeMetadata)
}
Import
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.flink.configuration.Configuration;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| event | ClusteringCommitEvent |
Yes | Contains the clustering instant time, file IDs (comma-separated string), a List<WriteStatus> with new file metadata (or null if the task failed), and the task ID
|
| context | SinkFunction.Context |
Yes | Flink sink function invocation context (provides processing time, watermark, etc.) |
| conf | Configuration |
Yes (constructor) | Flink configuration containing ignore.failed flag, clean.async.enabled flag, and write client settings
|
Outputs
| Name | Type | Description |
|---|---|---|
| REPLACE_COMMIT | Timeline commit | A committed REPLACE_COMMIT instant on the Hudi timeline containing HoodieCommitMetadata with write statistics and partitionToReplaceFileIds mapping
|
| Rollback (on failure) | Timeline rollback | If any clustering task fails or error records exceed threshold, the inflight clustering instant is rolled back |
| Clean (optional) | Clean action | If clean.async.enabled is false, an inline clean operation removes obsolete data files after the commit
|
Usage Examples
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
// Add the commit sink to the clustering pipeline
Configuration conf = getHudiConfiguration();
DataStream<ClusteringCommitEvent> clusteringResultStream = /* output of ClusteringOperator */;
clusteringResultStream
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1) // must be single-parallelism
.getTransformation()
.setMaxParallelism(1); // prevent rescaling
// The sink will:
// 1. Buffer events until all clustering groups report
// 2. Validate no failures occurred
// 3. Build HoodieCommitMetadata with CLUSTER operation type
// 4. Commit as REPLACE_COMMIT on the timeline
// 5. Optionally run inline clean