Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi ClusteringCommitSink DoCommit

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Layout_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for collecting clustering execution results, validating completeness, and committing the REPLACE_COMMIT to the Hudi timeline, provided by Apache Hudi.

Description

ClusteringCommitSink is a Flink SinkFunction that extends CleanFunction. It serves as the commit coordinator for the clustering workflow, collecting ClusteringCommitEvent objects from all parallel ClusteringOperator tasks and committing the results when all groups in the plan have reported.

The invoke method is called for each incoming event. It buffers events in a map keyed by (instantTime, fileIds) to handle deduplication from task retries. After buffering, it calls commitIfNecessary to check if all groups have reported.

The doCommit method performs the actual commit:

  1. Aggregate write statuses: Collects all WriteStatus objects from the buffered events.
  2. Error check: If the total error record count exceeds zero and ignore.failed is false, rolls back the clustering instant.
  3. Build write metadata: Creates HoodieWriteMetadata with write statuses, write stats, and the partition-to-replaced-file-IDs mapping.
  4. Validate: Calls validateWriteResult to ensure at least one new file was produced. Throws HoodieClusteringException if zero write statuses exist.
  5. Build commit metadata: Constructs HoodieCommitMetadata with operation type CLUSTER and action REPLACE_COMMIT.
  6. Commit: Calls writeClient.completeTableService(TableServiceType.CLUSTER, ...) to finalize the commit on the timeline.
  7. Cleanup: If async cleaning is disabled, triggers inline writeClient.clean().

The getPartitionToReplacedFileIds method computes which old file groups are being replaced by filtering out file group IDs that appear in the new write stats (since those were newly created).

Usage

Use ClusteringCommitSink as the terminal operator in any Flink clustering pipeline. It must run with parallelism 1 (and max parallelism 1) to ensure a single coordinator collects all events. The sink inherits cleaning capability from CleanFunction, making it suitable for SQL API use cases where multiple sinks are not allowed.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
  • Lines: 67-263 (class), 111-123 (invoke), 185-227 (doCommit), 239-246 (validateWriteResult)

Signature

public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {

    public ClusteringCommitSink(Configuration conf)

    @Override
    public void invoke(ClusteringCommitEvent event, Context context) throws Exception

    private void doCommit(
        String instant,
        HoodieClusteringPlan clusteringPlan,
        Collection<ClusteringCommitEvent> events)

    private static void validateWriteResult(
        HoodieClusteringPlan clusteringPlan,
        String instantTime,
        HoodieWriteMetadata<List<WriteStatus>> writeMetadata)
}

Import

import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.flink.configuration.Configuration;

I/O Contract

Inputs

Name Type Required Description
event ClusteringCommitEvent Yes Contains the clustering instant time, file IDs (comma-separated string), a List<WriteStatus> with new file metadata (or null if the task failed), and the task ID
context SinkFunction.Context Yes Flink sink function invocation context (provides processing time, watermark, etc.)
conf Configuration Yes (constructor) Flink configuration containing ignore.failed flag, clean.async.enabled flag, and write client settings

Outputs

Name Type Description
REPLACE_COMMIT Timeline commit A committed REPLACE_COMMIT instant on the Hudi timeline containing HoodieCommitMetadata with write statistics and partitionToReplaceFileIds mapping
Rollback (on failure) Timeline rollback If any clustering task fails or error records exceed threshold, the inflight clustering instant is rolled back
Clean (optional) Clean action If clean.async.enabled is false, an inline clean operation removes obsolete data files after the commit

Usage Examples

import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;

// Add the commit sink to the clustering pipeline
Configuration conf = getHudiConfiguration();
DataStream<ClusteringCommitEvent> clusteringResultStream = /* output of ClusteringOperator */;

clusteringResultStream
    .addSink(new ClusteringCommitSink(conf))
    .name("clustering_commit")
    .uid("uid_clustering_commit")
    .setParallelism(1)   // must be single-parallelism
    .getTransformation()
    .setMaxParallelism(1);  // prevent rescaling

// The sink will:
// 1. Buffer events until all clustering groups report
// 2. Validate no failures occurred
// 3. Build HoodieCommitMetadata with CLUSTER operation type
// 4. Commit as REPLACE_COMMIT on the timeline
// 5. Optionally run inline clean

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment