Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi ClusteringOperator DoClustering

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Layout_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for executing the clustering data rewrite by reading file slices, optionally sorting records, and writing new optimally-sized Parquet files, provided by Apache Hudi.

Description

ClusteringOperator is a Flink TableStreamOperator that implements the actual data processing for clustering. It receives ClusteringPlanEvent objects and produces ClusteringCommitEvent objects containing the write results.

The core logic resides in the doClustering method, which is invoked from processElement. In async mode, the clustering is executed on a dedicated NonThrownExecutor thread to avoid blocking Flink checkpoint barriers. In sync mode (batch), it executes directly.

The doClustering method follows a three-phase approach:

  1. Read: For each ClusteringOperation in the group, it determines the read path:
    • If any operation has delta (log) files, it uses HoodieFileGroupReader to perform a merge read that applies log file updates on top of base file records.
    • If only base files exist, it uses HoodieRowDataParquetReader for direct Parquet reads, which is more efficient.
    • Records from multiple operations are concatenated using CloseableConcatenatingIterator.
  2. Sort (optional): If sortClusteringEnabled is true, records are serialized to BinaryRowData and fed into a BinaryExternalSorter that uses Flink's managed memory and spills to disk for large datasets. The sorter is initialized with a compiled NormalizedKeyComputer and RecordComparator generated by SortOperatorGen.
  3. Write: Records are written via BulkInsertWriterHelper, which creates new Parquet files with the configured maximum file size. The writer produces WriteStatus objects describing each new file.

The HoodieFlinkClusteringJob provides the batch entry point, constructing a complete Flink pipeline with a ClusteringPlanSourceFunction as the source, the ClusteringOperator as the transform, and ClusteringCommitSink as the sink.

Usage

Use ClusteringOperator in both streaming and batch Flink pipelines. In streaming mode, set async clustering to avoid blocking checkpoints. In batch mode, configure parallelism to control how many clustering groups are processed concurrently. Enable sort clustering by setting clustering.sort.columns and allocating sufficient managed memory via write.sort.memory.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
  • Lines: 100-373 (class), 189-204 (processElement), 228-267 (doClustering)
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
  • Lines: 66-430 (class), 81-123 (main)

Signature

// ClusteringOperator
public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEvent>
    implements OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, BoundedOneInput {

    public ClusteringOperator(Configuration conf, RowType rowType)

    @Override
    public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception

    private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception
}

// HoodieFlinkClusteringJob
public class HoodieFlinkClusteringJob {

    public static void main(String[] args) throws Exception

    public void start(boolean serviceMode) throws Exception
}

Import

import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;

I/O Contract

Inputs

Name Type Required Description
element StreamRecord<ClusteringPlanEvent> Yes A stream record wrapping a ClusteringPlanEvent containing the instant time, ClusteringGroupInfo with a list of ClusteringOperation objects, and strategy parameters
conf Configuration Yes Flink configuration with clustering sort columns, target file max bytes, small file limit, write sort memory, and async clustering flag
rowType RowType Yes The logical row type of the table schema including metadata fields, used for serialization and sort code generation

Outputs

Name Type Description
ClusteringCommitEvent org.apache.hudi.sink.clustering.ClusteringCommitEvent Contains the clustering instant time, comma-separated file IDs of the input operations, a List<WriteStatus> describing newly written files (or null on failure), and the task ID

Usage Examples

import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.RowType;

// Build a batch clustering pipeline (as done in HoodieFlinkClusteringJob)
Configuration conf = getHudiConfiguration();
RowType rowType = getTableRowType();
int clusteringParallelism = 4;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<ClusteringCommitEvent> dataStream = env
    .addSource(new ClusteringPlanSourceFunction(instantTime, clusteringPlan, conf))
    .name("clustering_source")
    .uid("uid_clustering_source")
    .rebalance()
    .transform("clustering_task",
        TypeInformation.of(ClusteringCommitEvent.class),
        new ClusteringOperator(conf, rowType))
    .setParallelism(clusteringParallelism);

dataStream
    .addSink(new ClusteringCommitSink(conf))
    .name("clustering_commit")
    .uid("uid_clustering_commit")
    .setParallelism(1);

env.execute("flink_hudi_clustering_" + instantTime);

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment