Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Clustering Execution

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Layout_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Reading existing data files from a clustering group, optionally sorting the records, and rewriting them into new optimally-sized data files.

Description

The clustering execution phase is the core data processing step of the clustering workflow. It receives a ClusteringPlanEvent describing a group of file slices to rewrite, reads all records from those files, optionally sorts them, and writes them out as new files using the bulk insert path.

The execution follows a read-transform-write pattern:

  1. Read phase: For each clustering operation in the group, read the records. Two paths exist:
    • Base files only: When the file slices contain no log files (common in COW tables), the operator reads directly from Parquet base files using HoodieRowDataParquetReader. This is the fast path.
    • Base files with log files: When delta log files exist (MOR tables), the operator uses HoodieFileGroupReader to merge base file records with log file updates, producing the latest version of each record.
  2. Sort phase (optional): If sort columns are configured, records are fed into a BinaryExternalSorter that performs an external merge sort using Flink's managed memory. The sorter spills to disk when memory is exhausted, making it suitable for large file groups.
  3. Write phase: Records (sorted or unsorted) are written using BulkInsertWriterHelper, which creates new Parquet files respecting the configured maximum file size. The writer produces WriteStatus objects describing the newly created files.

The output is a ClusteringCommitEvent containing the list of write statuses for the downstream commit operator.

Usage

This principle applies in two execution contexts:

  1. Inline (streaming) clustering: The ClusteringOperator runs as a Flink operator in the streaming pipeline. It can execute clustering either synchronously (blocking checkpoint barriers) or asynchronously (using a dedicated executor thread).
  2. Offline (batch) clustering: The HoodieFlinkClusteringJob constructs a batch Flink pipeline with the ClusteringOperator. It supports parallelism configuration to control how many clustering groups are processed concurrently.

The execution is designed to be scalable: input events are shuffled across parallel operator instances, so each instance handles a subset of the clustering groups independently.

Theoretical Basis

The clustering execution implements a file group rewrite operation. The goal is to take N input files and produce M output files where M is typically smaller than N (many small files consolidated into fewer larger files) and the data is optionally sorted.

FUNCTION executeClustering(event):
    operations = event.clusteringGroupInfo.operations
    instantTime = event.clusteringInstantTime

    // Phase 1: Read
    IF any operation has delta log files:
        iterator = mergeReadWithLogs(operations, instantTime)
    ELSE:
        iterator = directReadBaseFiles(operations)

    // Phase 2: Optional Sort
    IF sortClusteringEnabled:
        sorter = new BinaryExternalSorter(managedMemory)
        FOR EACH record IN iterator:
            sorter.write(toBinaryRow(record))
        iterator = sorter.getIterator()

    // Phase 3: Write
    writer = new BulkInsertWriterHelper(instantTime, targetFileSize)
    FOR EACH record IN iterator:
        writer.write(record)

    writeStatuses = writer.getWriteStatuses()
    EMIT ClusteringCommitEvent(instantTime, fileIds, writeStatuses)

The external sort uses a tournament tree merge sort algorithm:

  1. Records are written to an in-memory buffer backed by Flink's managed memory pool.
  2. When the buffer is full, it is sorted and spilled to disk as a sorted run.
  3. After all records are processed, sorted runs are merged using a priority queue (tournament tree).
  4. The result is a fully sorted stream of records consumed by the writer.

This approach provides bounded memory usage while supporting arbitrarily large file groups.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment