Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi FlinkSizeBasedClusteringPlanStrategy BuildGroups

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Layout_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for building clustering groups from file slices using a size-based strategy with optional sort column configuration, provided by Apache Hudi.

Description

FlinkSizeBasedClusteringPlanStrategy is a Flink-specific clustering plan strategy that extends PartitionAwareClusteringPlanStrategy. It determines which file slices within a partition are eligible for clustering and how they are grouped.

The strategy operates in three phases:

  1. Eligibility filtering: The getFileSlicesEligibleForClustering method filters file slices to include only those whose base file size is smaller than the configured clusteringSmallFileLimit. This focuses clustering effort on small files that would benefit most from consolidation.
  2. Group building: The buildClusteringGroupsForPartition method delegates to the parent class PartitionAwareClusteringPlanStrategy to partition eligible file slices into groups based on the configured target file max bytes and max number of groups.
  3. Strategy parameters: The getStrategyParams method returns a map containing the sort column names (if configured) as a strategy parameter keyed by PLAN_STRATEGY_SORT_COLUMNS. These parameters are attached to the clustering plan and used by the downstream ClusteringOperator to determine whether and how to sort records.

The companion FlinkConsistentBucketClusteringPlanStrategy handles consistent hashing bucket index tables. It extends BaseConsistentHashingBucketClusteringPlanStrategy and disables both bucket merge and bucket sort plans because Flink's parallel execution model does not support multiple subtasks writing to the same file group.

The SortOperatorGen class generates Flink sort operators from a RowType and sort field names. It creates a SortCodeGenerator that produces compiled NormalizedKeyComputer and RecordComparator instances for efficient sorting.

Usage

Use FlinkSizeBasedClusteringPlanStrategy as the default strategy for non-bucket-indexed Hudi tables when you want to consolidate small files. Set clustering.plan.strategy.sort.columns to enable sorted clustering. Use FlinkConsistentBucketClusteringPlanStrategy when clustering tables that use the consistent hashing bucket index.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
  • Lines: 49-78
  • File: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java
  • Lines: 36-56
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
  • Lines: 34-59

Signature

// FlinkSizeBasedClusteringPlanStrategy
public class FlinkSizeBasedClusteringPlanStrategy<T>
    extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {

    public FlinkSizeBasedClusteringPlanStrategy(
        HoodieTable table,
        HoodieEngineContext engineContext,
        HoodieWriteConfig writeConfig)

    @Override
    protected Pair<Stream<HoodieClusteringGroup>, Boolean> buildClusteringGroupsForPartition(
        String partitionPath, List<FileSlice> fileSlices)

    @Override
    protected Map<String, String> getStrategyParams()

    @Override
    protected Stream<FileSlice> getFileSlicesEligibleForClustering(String partition)
}

// FlinkConsistentBucketClusteringPlanStrategy
public class FlinkConsistentBucketClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
    extends BaseConsistentHashingBucketClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {

    public FlinkConsistentBucketClusteringPlanStrategy(
        HoodieTable table,
        HoodieEngineContext engineContext,
        HoodieWriteConfig writeConfig)
}

// SortOperatorGen
public class SortOperatorGen {

    public SortOperatorGen(RowType rowType, String[] sortFields)

    public OneInputStreamOperator<RowData, RowData> createSortOperator(Configuration conf)

    public SortCodeGenerator createSortCodeGenerator()
}

Import

import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.common.util.collection.Pair;

I/O Contract

Inputs

Name Type Required Description
partitionPath String Yes The partition path for which to build clustering groups
fileSlices List<FileSlice> Yes All file slices in the partition from the table file system view
table HoodieTable Yes The Hudi table providing the file system view and write configuration
engineContext HoodieEngineContext Yes The engine context for parallelism and execution
writeConfig HoodieWriteConfig Yes Configuration including clusteringSmallFileLimit, clusteringSortColumns, target file size, and max number of groups

Outputs

Name Type Description
clusteringGroups Pair<Stream<HoodieClusteringGroup>, Boolean> A stream of clustering groups (each containing file slices to rewrite together) paired with a boolean indicating if more groups may exist
strategyParams Map<String, String> Strategy parameters map containing sort column names if configured (key: hoodie.clustering.plan.strategy.sort.columns)

Usage Examples

import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieClusteringConfig;

// Configure the clustering strategy
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
    .withPath("/path/to/hudi/table")
    .withClusteringConfig(HoodieClusteringConfig.newBuilder()
        .withClusteringPlanStrategyClass(
            FlinkSizeBasedClusteringPlanStrategy.class.getName())
        .withClusteringPlanPartitionFilterMode("NONE")
        .withClusteringSortColumns("ts,city")
        .withClusteringTargetFileMaxBytes(128 * 1024 * 1024L)  // 128 MB
        .withClusteringSmallFileLimit(64)  // 64 MB
        .withClusteringMaxNumGroups(30)
        .build())
    .build();

// The strategy is instantiated internally by the write client
// during clustering plan scheduling:
FlinkSizeBasedClusteringPlanStrategy<HoodieData> strategy =
    new FlinkSizeBasedClusteringPlanStrategy<>(table, engineContext, writeConfig);

// Build groups for a specific partition
Pair<Stream<HoodieClusteringGroup>, Boolean> result =
    strategy.buildClusteringGroupsForPartition("2024/01/15", fileSlices);

// Retrieve strategy params (sort columns)
Map<String, String> params = strategy.getStrategyParams();
// params = {"hoodie.clustering.plan.strategy.sort.columns": "ts,city"}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment