Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Clustering Commit

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Layout_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Collecting all clustering execution results, validating completeness against the original plan, and atomically committing the file replacements to the Hudi timeline.

Description

The clustering commit phase is the final step in the clustering workflow. It acts as a barrier and validator: it must receive successful results from every clustering group in the plan before it can commit the overall operation. This ensures that partial clustering (where some groups succeed and others fail) does not corrupt the table state.

The commit process follows these steps:

  1. Event collection: Each ClusteringCommitEvent from a parallel clustering operator task is buffered in a map keyed by instant time and file IDs. The map structure deduplicates events in case of task retries.
  2. Completeness check: The sink compares the number of received events against the number of input groups in the original HoodieClusteringPlan. The commit only proceeds when all groups have reported.
  3. Failure handling: If any event indicates failure (null write statuses or error records above threshold), the entire clustering instant is rolled back via ClusteringUtil.rollbackClustering.
  4. Validation: The validateWriteResult method verifies that at least one new file was produced. This guards against silent data loss.
  5. Metadata construction: A HoodieCommitMetadata is built with WriteOperationType.CLUSTER, containing write statistics, the partition-to-replaced-file-IDs mapping, and the table schema.
  6. Timeline commit: The metadata is committed as a REPLACE_COMMIT on the Hudi timeline, atomically replacing old file groups with new ones.
  7. Optional cleaning: If inline cleaning is enabled (async cleaning disabled), the commit sink triggers a clean operation to remove obsolete data files.

Usage

This principle applies whenever clustering execution completes, in both streaming and batch modes. The commit sink must run with parallelism 1 to ensure a single coordinator collects all events and makes the atomic commit decision.

The commit sink also inherits cleaning capabilities from CleanFunction, allowing it to perform inline cleaning after a successful clustering commit. This is particularly important for SQL API users where multiple sinks per table are not supported.

Theoretical Basis

The clustering commit implements a two-phase commit protocol adapted for Flink's dataflow model:

FUNCTION commitClustering(instant, events, clusteringPlan):
    // Phase 1: Validate completeness
    IF events.size() != clusteringPlan.inputGroups.size():
        RETURN  // not all groups have reported yet

    IF any event in events is failed:
        ROLLBACK(instant)
        RETURN

    // Phase 2: Validate results
    allStatuses = events.flatMap(e -> e.writeStatuses)
    errorRecords = sum(allStatuses.map(s -> s.totalErrorRecords))

    IF errorRecords > 0 AND NOT ignoreFailed:
        ROLLBACK(instant)
        RETURN

    IF allStatuses.isEmpty():
        THROW ClusteringException("0 WriteStatus produced")

    // Phase 3: Build commit metadata
    newFileGroups = allStatuses.map(s -> (s.partition, s.fileId))
    oldFileGroups = clusteringPlan.getFileGroups()
    replacedFileIds = oldFileGroups.filter(fg -> fg NOT IN newFileGroups)

    metadata = buildCommitMetadata(
        writeStats = allStatuses.map(s -> s.stat),
        partitionToReplaceFileIds = replacedFileIds.groupBy(partition),
        operationType = CLUSTER
    )

    // Phase 4: Atomic commit
    timeline.commitReplaceCommit(instant, metadata)

    // Phase 5: Optional cleanup
    IF NOT asyncCleanEnabled:
        writeClient.clean()

The partitionToReplaceFileIds mapping is central to the atomicity guarantee. It tells the timeline which old file groups are being replaced by the new files. After the commit, any reader opening the table will see the new files and skip the old ones, without any intermediate state being visible.

The barrier semantics (waiting for all groups) ensure that either all file groups in the plan are replaced or none are. This prevents scenarios where a reader might see some old files and some new files for the same logical data.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment