Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi CompactionUtil ScheduleCompaction

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for assessing whether asynchronous compaction is needed and scheduling compaction plans on the Hudi timeline, provided by Apache Hudi.

Description

This implementation encompasses two cooperating methods that form the entry point of the Flink MOR compaction workflow:

  1. OptionsResolver.needsAsyncCompaction(Configuration) -- A guard predicate evaluated during Flink pipeline construction. It checks whether the table type is MERGE_ON_READ (via OptionsResolver.isMorTable()) and whether the COMPACTION_ASYNC_ENABLED flag is set to true. Only when both conditions hold does the pipeline wire in the compaction sub-pipeline operators.
  1. CompactionUtil.scheduleCompaction(HoodieFlinkWriteClient, boolean, boolean) -- Once the pipeline is running, this utility method is called to schedule a new compaction instant on the Hudi timeline. It delegates to writeClient.scheduleCompaction(Option.empty()) when either a new commit has been successfully written (committed == true) or the compaction trigger strategy is time-based (deltaTimeCompaction == true), ensuring that time-elapsed triggers can fire even when there are no new commits.

Usage

Use needsAsyncCompaction at pipeline setup time to conditionally add the compaction sub-pipeline to the Flink DAG. Use scheduleCompaction within the write pipeline's checkpoint notification to create compaction plans after each successful commit cycle.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
  • Lines: 266-268
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
  • Lines: 55-66

Signature

// OptionsResolver.java:266-268
public static boolean needsAsyncCompaction(Configuration conf) {
    return OptionsResolver.isMorTable(conf) && conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}

// CompactionUtil.java:55-66
public static void scheduleCompaction(
    HoodieFlinkWriteClient<?> writeClient,
    boolean deltaTimeCompaction,
    boolean committed) {
  if (committed) {
    writeClient.scheduleCompaction(Option.empty());
  } else if (deltaTimeCompaction) {
    // if there are no new commits and the compaction trigger strategy is based on elapsed delta time,
    // schedules the compaction anyway.
    writeClient.scheduleCompaction(Option.empty());
  }
}

Import

import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;

I/O Contract

Inputs

Name Type Required Description
conf org.apache.flink.configuration.Configuration Yes Flink configuration containing FlinkOptions.TABLE_TYPE (must be MERGE_ON_READ) and FlinkOptions.COMPACTION_ASYNC_ENABLED (boolean flag).
writeClient HoodieFlinkWriteClient<?> Yes The Hudi write client used to schedule the compaction plan on the active timeline.
deltaTimeCompaction boolean Yes Whether the compaction trigger strategy is based on elapsed delta time (e.g., time_elapsed, num_or_time).
committed boolean Yes Whether the last write instant was committed successfully.

Outputs

Name Type Description
needsAsyncCompaction result boolean true if the pipeline should wire in the compaction sub-pipeline; false otherwise.
Compaction plan on timeline void (side effect) A new compaction plan instant is scheduled on the Hudi timeline when conditions are met. The plan is stored as a requested compaction instant in the table metadata.

Usage Examples

// During Flink pipeline construction: decide whether to wire compaction operators
Configuration conf = new Configuration();
conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);

if (OptionsResolver.needsAsyncCompaction(conf)) {
    // Wire in CompactionPlanOperator -> CompactOperator -> CompactionCommitSink
    // as a sub-pipeline downstream of the write operator
}

// During checkpoint notification: schedule a compaction plan
boolean deltaTimeCompaction = "time_elapsed".equals(triggerStrategy);
boolean committed = true; // last instant committed successfully

CompactionUtil.scheduleCompaction(writeClient, deltaTimeCompaction, committed);
// A new compaction plan instant is now on the timeline

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment