Implementation:Apache Hudi ClusteringOperator DoClustering
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for executing the clustering data rewrite by reading file slices, optionally sorting records, and writing new optimally-sized Parquet files, provided by Apache Hudi.
Description
ClusteringOperator is a Flink TableStreamOperator that implements the actual data processing for clustering. It receives ClusteringPlanEvent objects and produces ClusteringCommitEvent objects containing the write results.
The core logic resides in the doClustering method, which is invoked from processElement. In async mode, the clustering is executed on a dedicated NonThrownExecutor thread to avoid blocking Flink checkpoint barriers. In sync mode (batch), it executes directly.
The doClustering method follows a three-phase approach:
- Read: For each
ClusteringOperationin the group, it determines the read path:- If any operation has delta (log) files, it uses
HoodieFileGroupReaderto perform a merge read that applies log file updates on top of base file records. - If only base files exist, it uses
HoodieRowDataParquetReaderfor direct Parquet reads, which is more efficient. - Records from multiple operations are concatenated using
CloseableConcatenatingIterator.
- If any operation has delta (log) files, it uses
- Sort (optional): If
sortClusteringEnabledis true, records are serialized toBinaryRowDataand fed into aBinaryExternalSorterthat uses Flink's managed memory and spills to disk for large datasets. The sorter is initialized with a compiledNormalizedKeyComputerandRecordComparatorgenerated bySortOperatorGen. - Write: Records are written via
BulkInsertWriterHelper, which creates new Parquet files with the configured maximum file size. The writer producesWriteStatusobjects describing each new file.
The HoodieFlinkClusteringJob provides the batch entry point, constructing a complete Flink pipeline with a ClusteringPlanSourceFunction as the source, the ClusteringOperator as the transform, and ClusteringCommitSink as the sink.
Usage
Use ClusteringOperator in both streaming and batch Flink pipelines. In streaming mode, set async clustering to avoid blocking checkpoints. In batch mode, configure parallelism to control how many clustering groups are processed concurrently. Enable sort clustering by setting clustering.sort.columns and allocating sufficient managed memory via write.sort.memory.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java - Lines: 100-373 (class), 189-204 (processElement), 228-267 (doClustering)
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java - Lines: 66-430 (class), 81-123 (main)
Signature
// ClusteringOperator
public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEvent>
implements OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, BoundedOneInput {
public ClusteringOperator(Configuration conf, RowType rowType)
@Override
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception
private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception
}
// HoodieFlinkClusteringJob
public class HoodieFlinkClusteringJob {
public static void main(String[] args) throws Exception
public void start(boolean serviceMode) throws Exception
}
Import
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| element | StreamRecord<ClusteringPlanEvent> |
Yes | A stream record wrapping a ClusteringPlanEvent containing the instant time, ClusteringGroupInfo with a list of ClusteringOperation objects, and strategy parameters
|
| conf | Configuration |
Yes | Flink configuration with clustering sort columns, target file max bytes, small file limit, write sort memory, and async clustering flag |
| rowType | RowType |
Yes | The logical row type of the table schema including metadata fields, used for serialization and sort code generation |
Outputs
| Name | Type | Description |
|---|---|---|
| ClusteringCommitEvent | org.apache.hudi.sink.clustering.ClusteringCommitEvent |
Contains the clustering instant time, comma-separated file IDs of the input operations, a List<WriteStatus> describing newly written files (or null on failure), and the task ID
|
Usage Examples
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.RowType;
// Build a batch clustering pipeline (as done in HoodieFlinkClusteringJob)
Configuration conf = getHudiConfiguration();
RowType rowType = getTableRowType();
int clusteringParallelism = 4;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ClusteringCommitEvent> dataStream = env
.addSource(new ClusteringPlanSourceFunction(instantTime, clusteringPlan, conf))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(clusteringParallelism);
dataStream
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);
env.execute("flink_hudi_clustering_" + instantTime);