Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi CompactionCommitSink Invoke

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for collecting compaction results from parallel tasks and committing the compaction instant atomically to the Hudi timeline, provided by Apache Hudi.

Description

This implementation consists of two cooperating classes:

  1. CompactionCommitSink -- A Flink SinkFunction (extending CleanFunction) that runs at parallelism=1. Its invoke() method receives CompactionCommitEvent objects from all parallel CompactOperator tasks. It logs warnings for failed or error-containing events, then delegates to the appropriate CompactCommitHandler (for data table or metadata table compactions).
  1. CompactCommitHandler -- The core commit logic. It maintains a commitBuffer (a Map<String, Map<String, CompactionCommitEvent>> keyed by instant time and file ID) and a compactionPlanCache. When commitIfNecessary() is called:
    • It adds the event to the commit buffer.
    • It retrieves the compaction plan for the instant (using the cache or reading from the timeline via CompactionUtils.getCompactionPlan()).
    • It checks if the buffer size equals the plan's operation count (completeness check).
    • If any event is marked as failed, it rolls back the compaction via CompactionUtil.rollbackCompaction() and records a rollback metric.
    • If all events succeeded, it aggregates WriteStatus objects, checks error record counts, creates HoodieCommitMetadata via CompactHelpers.getInstance().createCompactionMetadata(), and calls writeClient.completeCompaction().
    • If async cleaning is disabled, it triggers writeClient.clean() for synchronous cleanup.
    • Finally, it resets the commit buffer and plan cache for the completed instant.

The CompactionCommitSink also inherits cleaning ability from CleanFunction, which is needed because the Flink SQL API does not allow multiple sinks in one table sink provider.

Usage

This sink is placed at the end of the compaction Flink pipeline at parallelism=1 (with maxParallelism=1 to prevent rescaling). It is the final stage that makes compaction results visible to readers.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
  • Lines: 79-93
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java
  • Lines: 105-141

Signature

// CompactionCommitSink.java:79-93
@Override
public void invoke(CompactionCommitEvent event, Context context) throws Exception {
    final String instant = event.getInstant();
    if (event.isFailed()
        || (event.getWriteStatuses() != null
        && event.getWriteStatuses().stream().anyMatch(writeStatus -> writeStatus.getTotalErrorRecords() > 0))) {
      log.warn("Received abnormal CompactionCommitEvent of instant {}, task ID is {},"
              + " is failed: {}, error record count: {}",
          instant, event.getTaskID(), event.isFailed(), getNumErrorRecords(event));
    }
    if (event.isMetadataTable()) {
      mdtCompactCommitHandler.get().commitIfNecessary(event, compactionMetrics);
    } else {
      compactCommitHandler.get().commitIfNecessary(event, compactionMetrics);
    }
}

// CompactCommitHandler.java:105-141
public void commitIfNecessary(CompactionCommitEvent event, FlinkCompactionMetrics compactionMetrics) {
    String instant = event.getInstant();
    commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
        .put(event.getFileId(), event);

    boolean isLogCompaction = event.isLogCompaction();
    HoodieCompactionPlan compactionPlan = getCompactionPlan(instant, isLogCompaction);
    Collection<CompactionCommitEvent> events = commitBuffer.get(instant).values();

    boolean isReady = compactionPlan.getOperations().size() == events.size();
    if (!isReady) {
      return;
    }

    if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
      try {
        rollbackCompaction(instant, isLogCompaction);
      } finally {
        reset(instant);
        compactionMetrics.markCompactionRolledBack();
      }
      return;
    }

    try {
      doCommit(instant, isLogCompaction, events, compactionMetrics);
    } catch (Throwable throwable) {
      log.error("Error while committing compaction instant: {}", instant, throwable);
      compactionMetrics.markCompactionRolledBack();
    } finally {
      reset(instant);
    }
}

Import

import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.handler.CompactCommitHandler;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.sink.CleanFunction;

I/O Contract

Inputs

Name Type Required Description
event CompactionCommitEvent Yes A commit event from a compaction task containing: instant (String, the compaction instant time), fileId (String, the file group ID), writeStatuses (List<WriteStatus>, null if failed), taskID (int), isMetadataTable (boolean), isLogCompaction (boolean).
context SinkFunction.Context Yes Flink sink function context providing timestamp and watermark information.
compactionPlan HoodieCompactionPlan Yes (loaded internally) The compaction plan for the instant, loaded from the Hudi timeline via CompactionUtils.getCompactionPlan(). Contains the list of CompactionOperation entries that must all be completed before commit.

Outputs

Name Type Description
Committed compaction instant Side effect (timeline) On success: the compaction instant transitions from inflight to committed on the Hudi timeline, with HoodieCommitMetadata recording all file-level changes.
Rolled-back compaction instant Side effect (timeline) On failure: the compaction instant is rolled back, removing it from the active timeline. Orphaned base files are cleaned up by subsequent clean operations.
Clean operation Side effect (optional) If FlinkOptions.CLEAN_ASYNC_ENABLED is false, a synchronous writeClient.clean() is triggered after successful commit to remove old file versions.
Compaction metrics Side effect (metrics) Metrics are updated via FlinkCompactionMetrics: markCompactionCompleted() on success, markCompactionRolledBack() on failure, and updateCommitMetrics() with commit metadata.

Usage Examples

// CompactionCommitSink is placed at the end of the compaction pipeline:
env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
    .name("compaction_source")
    .uid("uid_compaction_source")
    .rebalance()
    .transform("compact_task",
        TypeInformation.of(CompactionCommitEvent.class),
        new CompactOperator(conf))
    .setParallelism(compactionParallelism)
    .addSink(new CompactionCommitSink(conf))    // <-- this sink
    .name("compaction_commit")
    .uid("uid_compaction_commit")
    .setParallelism(1)
    .getTransformation()
    .setMaxParallelism(1);   // Must be 1 for correct commit coordination

// The sink receives CompactionCommitEvents from all CompactOperator tasks.
// Example flow for a compaction plan with 3 operations:
//   Task 0 emits: CompactionCommitEvent(instant="20240101120000", fileId="fg-1", statuses=[...], taskID=0)
//   Task 1 emits: CompactionCommitEvent(instant="20240101120000", fileId="fg-2", statuses=[...], taskID=1)
//   Task 2 emits: CompactionCommitEvent(instant="20240101120000", fileId="fg-3", statuses=[...], taskID=2)
// After receiving all 3 events, commitIfNecessary() detects completeness and commits.

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment