Implementation:Apache Hudi FlinkSizeBasedClusteringPlanStrategy BuildGroups
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for building clustering groups from file slices using a size-based strategy with optional sort column configuration, provided by Apache Hudi.
Description
FlinkSizeBasedClusteringPlanStrategy is a Flink-specific clustering plan strategy that extends PartitionAwareClusteringPlanStrategy. It determines which file slices within a partition are eligible for clustering and how they are grouped.
The strategy operates in three phases:
- Eligibility filtering: The
getFileSlicesEligibleForClusteringmethod filters file slices to include only those whose base file size is smaller than the configuredclusteringSmallFileLimit. This focuses clustering effort on small files that would benefit most from consolidation. - Group building: The
buildClusteringGroupsForPartitionmethod delegates to the parent classPartitionAwareClusteringPlanStrategyto partition eligible file slices into groups based on the configured target file max bytes and max number of groups. - Strategy parameters: The
getStrategyParamsmethod returns a map containing the sort column names (if configured) as a strategy parameter keyed byPLAN_STRATEGY_SORT_COLUMNS. These parameters are attached to the clustering plan and used by the downstreamClusteringOperatorto determine whether and how to sort records.
The companion FlinkConsistentBucketClusteringPlanStrategy handles consistent hashing bucket index tables. It extends BaseConsistentHashingBucketClusteringPlanStrategy and disables both bucket merge and bucket sort plans because Flink's parallel execution model does not support multiple subtasks writing to the same file group.
The SortOperatorGen class generates Flink sort operators from a RowType and sort field names. It creates a SortCodeGenerator that produces compiled NormalizedKeyComputer and RecordComparator instances for efficient sorting.
Usage
Use FlinkSizeBasedClusteringPlanStrategy as the default strategy for non-bucket-indexed Hudi tables when you want to consolidate small files. Set clustering.plan.strategy.sort.columns to enable sorted clustering. Use FlinkConsistentBucketClusteringPlanStrategy when clustering tables that use the consistent hashing bucket index.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java - Lines: 49-78
- File:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java - Lines: 36-56
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java - Lines: 34-59
Signature
// FlinkSizeBasedClusteringPlanStrategy
public class FlinkSizeBasedClusteringPlanStrategy<T>
extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public FlinkSizeBasedClusteringPlanStrategy(
HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig)
@Override
protected Pair<Stream<HoodieClusteringGroup>, Boolean> buildClusteringGroupsForPartition(
String partitionPath, List<FileSlice> fileSlices)
@Override
protected Map<String, String> getStrategyParams()
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(String partition)
}
// FlinkConsistentBucketClusteringPlanStrategy
public class FlinkConsistentBucketClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends BaseConsistentHashingBucketClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public FlinkConsistentBucketClusteringPlanStrategy(
HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig)
}
// SortOperatorGen
public class SortOperatorGen {
public SortOperatorGen(RowType rowType, String[] sortFields)
public OneInputStreamOperator<RowData, RowData> createSortOperator(Configuration conf)
public SortCodeGenerator createSortCodeGenerator()
}
Import
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.common.util.collection.Pair;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| partitionPath | String |
Yes | The partition path for which to build clustering groups |
| fileSlices | List<FileSlice> |
Yes | All file slices in the partition from the table file system view |
| table | HoodieTable |
Yes | The Hudi table providing the file system view and write configuration |
| engineContext | HoodieEngineContext |
Yes | The engine context for parallelism and execution |
| writeConfig | HoodieWriteConfig |
Yes | Configuration including clusteringSmallFileLimit, clusteringSortColumns, target file size, and max number of groups
|
Outputs
| Name | Type | Description |
|---|---|---|
| clusteringGroups | Pair<Stream<HoodieClusteringGroup>, Boolean> |
A stream of clustering groups (each containing file slices to rewrite together) paired with a boolean indicating if more groups may exist |
| strategyParams | Map<String, String> |
Strategy parameters map containing sort column names if configured (key: hoodie.clustering.plan.strategy.sort.columns)
|
Usage Examples
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
// Configure the clustering strategy
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/path/to/hudi/table")
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringPlanStrategyClass(
FlinkSizeBasedClusteringPlanStrategy.class.getName())
.withClusteringPlanPartitionFilterMode("NONE")
.withClusteringSortColumns("ts,city")
.withClusteringTargetFileMaxBytes(128 * 1024 * 1024L) // 128 MB
.withClusteringSmallFileLimit(64) // 64 MB
.withClusteringMaxNumGroups(30)
.build())
.build();
// The strategy is instantiated internally by the write client
// during clustering plan scheduling:
FlinkSizeBasedClusteringPlanStrategy<HoodieData> strategy =
new FlinkSizeBasedClusteringPlanStrategy<>(table, engineContext, writeConfig);
// Build groups for a specific partition
Pair<Stream<HoodieClusteringGroup>, Boolean> result =
strategy.buildClusteringGroupsForPartition("2024/01/15", fileSlices);
// Retrieve strategy params (sort columns)
Map<String, String> params = strategy.getStrategyParams();
// params = {"hoodie.clustering.plan.strategy.sort.columns": "ts,city"}