Principle:Apache Hudi Clustering Execution
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Reading existing data files from a clustering group, optionally sorting the records, and rewriting them into new optimally-sized data files.
Description
The clustering execution phase is the core data processing step of the clustering workflow. It receives a ClusteringPlanEvent describing a group of file slices to rewrite, reads all records from those files, optionally sorts them, and writes them out as new files using the bulk insert path.
The execution follows a read-transform-write pattern:
- Read phase: For each clustering operation in the group, read the records. Two paths exist:
- Base files only: When the file slices contain no log files (common in COW tables), the operator reads directly from Parquet base files using
HoodieRowDataParquetReader. This is the fast path. - Base files with log files: When delta log files exist (MOR tables), the operator uses
HoodieFileGroupReaderto merge base file records with log file updates, producing the latest version of each record.
- Base files only: When the file slices contain no log files (common in COW tables), the operator reads directly from Parquet base files using
- Sort phase (optional): If sort columns are configured, records are fed into a
BinaryExternalSorterthat performs an external merge sort using Flink's managed memory. The sorter spills to disk when memory is exhausted, making it suitable for large file groups. - Write phase: Records (sorted or unsorted) are written using
BulkInsertWriterHelper, which creates new Parquet files respecting the configured maximum file size. The writer producesWriteStatusobjects describing the newly created files.
The output is a ClusteringCommitEvent containing the list of write statuses for the downstream commit operator.
Usage
This principle applies in two execution contexts:
- Inline (streaming) clustering: The
ClusteringOperatorruns as a Flink operator in the streaming pipeline. It can execute clustering either synchronously (blocking checkpoint barriers) or asynchronously (using a dedicated executor thread). - Offline (batch) clustering: The
HoodieFlinkClusteringJobconstructs a batch Flink pipeline with theClusteringOperator. It supports parallelism configuration to control how many clustering groups are processed concurrently.
The execution is designed to be scalable: input events are shuffled across parallel operator instances, so each instance handles a subset of the clustering groups independently.
Theoretical Basis
The clustering execution implements a file group rewrite operation. The goal is to take N input files and produce M output files where M is typically smaller than N (many small files consolidated into fewer larger files) and the data is optionally sorted.
FUNCTION executeClustering(event):
operations = event.clusteringGroupInfo.operations
instantTime = event.clusteringInstantTime
// Phase 1: Read
IF any operation has delta log files:
iterator = mergeReadWithLogs(operations, instantTime)
ELSE:
iterator = directReadBaseFiles(operations)
// Phase 2: Optional Sort
IF sortClusteringEnabled:
sorter = new BinaryExternalSorter(managedMemory)
FOR EACH record IN iterator:
sorter.write(toBinaryRow(record))
iterator = sorter.getIterator()
// Phase 3: Write
writer = new BulkInsertWriterHelper(instantTime, targetFileSize)
FOR EACH record IN iterator:
writer.write(record)
writeStatuses = writer.getWriteStatuses()
EMIT ClusteringCommitEvent(instantTime, fileIds, writeStatuses)
The external sort uses a tournament tree merge sort algorithm:
- Records are written to an in-memory buffer backed by Flink's managed memory pool.
- When the buffer is full, it is sorted and spilled to disk as a sorted run.
- After all records are processed, sorted runs are merged using a priority queue (tournament tree).
- The result is a fully sorted stream of records consumed by the writer.
This approach provides bounded memory usage while supporting arbitrarily large file groups.