Principle:Apache Hudi Clustering Strategy Configuration
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Configuring the pluggable strategy that determines how file slices within a partition are grouped into clustering units and optionally sorted during rewriting.
Description
A clustering strategy defines which files to include in clustering and how to group them. Apache Hudi provides a pluggable strategy system where different strategies can be selected via configuration. The strategy operates during the plan creation phase (before the plan is written to the timeline) and determines the shape of the clustering work.
The key strategy dimensions are:
- File eligibility: Which files in a partition qualify for clustering. The default size-based strategy excludes files larger than a configurable
small.file.limit, focusing clustering effort on small files that would benefit from being merged into larger ones. - Group formation: How eligible files are partitioned into clustering groups. Each group defines a set of file slices that will be rewritten together into one or more new files. The maximum group size is bounded by
target.file.max.bytesandmax.num.groups. - Strategy parameters: Metadata attached to the plan that downstream operators use during execution. For example, the sort column names are passed as strategy parameters so the clustering operator knows which columns to sort by.
- Sort operator generation: When sort columns are specified, a
SortOperatorGenproduces a FlinkSortCodeGeneratorthat creates compiled comparators and normalized key computers for efficient in-memory and external sorting.
Two primary strategies exist for Flink:
- FlinkSizeBasedClusteringPlanStrategy: Groups small files by size, optionally specifying sort columns. This is the default for non-bucket tables.
- FlinkConsistentBucketClusteringPlanStrategy: Handles consistent hashing bucket index tables where bucket splits and merges require clustering. This strategy disables merge and sort plans to avoid multiple subtasks writing to the same file group.
Usage
Configure the strategy via the following Flink configuration options:
clustering.plan.strategy.class: Fully qualified class name of the strategy.clustering.plan.strategy.sort.columns: Comma-separated list of columns to sort by during clustering.clustering.plan.strategy.target.file.max.bytes: Maximum size of output files after clustering.clustering.plan.strategy.small.file.limit: Files below this size are eligible for clustering.clustering.plan.strategy.max.num.groups: Maximum number of clustering groups per partition.
Theoretical Basis
The clustering strategy addresses a classic bin packing problem: given a set of files with varying sizes, partition them into groups such that each group's total size approximates the target file size.
FUNCTION buildClusteringGroups(partition, fileSlices, config):
eligible = fileSlices.filter(f -> f.baseFileSize < config.smallFileLimit)
IF eligible.isEmpty():
RETURN empty
groups = []
currentGroup = []
currentSize = 0
FOR EACH slice IN eligible (ordered by size):
IF currentSize + slice.size > config.targetFileMaxBytes:
groups.add(currentGroup)
currentGroup = [slice]
currentSize = slice.size
ELSE:
currentGroup.add(slice)
currentSize += slice.size
IF currentGroup NOT EMPTY:
groups.add(currentGroup)
// Limit to max groups
RETURN groups.take(config.maxNumGroups)
When sort columns are specified, the strategy passes them as parameters so the execution phase can apply sorting. Sorting by frequently-filtered columns (e.g., date, region) improves data skipping during query execution, as min/max statistics in Parquet file footers allow the query engine to skip entire row groups.
The SortOperatorGen leverages Flink's code generation framework to produce specialized comparators at runtime:
FUNCTION createSortOperator(rowType, sortFields):
sortIndices = sortFields.map(field -> rowType.getFieldIndex(field))
sortSpec = buildSortSpec(sortIndices, ascending=true, nullsFirst=true)
codeGen = new SortCodeGenerator(rowType, sortSpec)
RETURN new SortOperator(codeGen.generateKeyComputer(), codeGen.generateComparator())