Implementation:Apache Hudi CompactionUtil ScheduleCompaction
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for assessing whether asynchronous compaction is needed and scheduling compaction plans on the Hudi timeline, provided by Apache Hudi.
Description
This implementation encompasses two cooperating methods that form the entry point of the Flink MOR compaction workflow:
OptionsResolver.needsAsyncCompaction(Configuration)-- A guard predicate evaluated during Flink pipeline construction. It checks whether the table type isMERGE_ON_READ(viaOptionsResolver.isMorTable()) and whether theCOMPACTION_ASYNC_ENABLEDflag is set to true. Only when both conditions hold does the pipeline wire in the compaction sub-pipeline operators.
CompactionUtil.scheduleCompaction(HoodieFlinkWriteClient, boolean, boolean)-- Once the pipeline is running, this utility method is called to schedule a new compaction instant on the Hudi timeline. It delegates towriteClient.scheduleCompaction(Option.empty())when either a new commit has been successfully written (committed == true) or the compaction trigger strategy is time-based (deltaTimeCompaction == true), ensuring that time-elapsed triggers can fire even when there are no new commits.
Usage
Use needsAsyncCompaction at pipeline setup time to conditionally add the compaction sub-pipeline to the Flink DAG. Use scheduleCompaction within the write pipeline's checkpoint notification to create compaction plans after each successful commit cycle.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java - Lines: 266-268
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java - Lines: 55-66
Signature
// OptionsResolver.java:266-268
public static boolean needsAsyncCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf) && conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}
// CompactionUtil.java:55-66
public static void scheduleCompaction(
HoodieFlinkWriteClient<?> writeClient,
boolean deltaTimeCompaction,
boolean committed) {
if (committed) {
writeClient.scheduleCompaction(Option.empty());
} else if (deltaTimeCompaction) {
// if there are no new commits and the compaction trigger strategy is based on elapsed delta time,
// schedules the compaction anyway.
writeClient.scheduleCompaction(Option.empty());
}
}
Import
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| conf | org.apache.flink.configuration.Configuration |
Yes | Flink configuration containing FlinkOptions.TABLE_TYPE (must be MERGE_ON_READ) and FlinkOptions.COMPACTION_ASYNC_ENABLED (boolean flag).
|
| writeClient | HoodieFlinkWriteClient<?> |
Yes | The Hudi write client used to schedule the compaction plan on the active timeline. |
| deltaTimeCompaction | boolean |
Yes | Whether the compaction trigger strategy is based on elapsed delta time (e.g., time_elapsed, num_or_time).
|
| committed | boolean |
Yes | Whether the last write instant was committed successfully. |
Outputs
| Name | Type | Description |
|---|---|---|
| needsAsyncCompaction result | boolean |
true if the pipeline should wire in the compaction sub-pipeline; false otherwise.
|
| Compaction plan on timeline | void (side effect) |
A new compaction plan instant is scheduled on the Hudi timeline when conditions are met. The plan is stored as a requested compaction instant in the table metadata. |
Usage Examples
// During Flink pipeline construction: decide whether to wire compaction operators
Configuration conf = new Configuration();
conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
if (OptionsResolver.needsAsyncCompaction(conf)) {
// Wire in CompactionPlanOperator -> CompactOperator -> CompactionCommitSink
// as a sub-pipeline downstream of the write operator
}
// During checkpoint notification: schedule a compaction plan
boolean deltaTimeCompaction = "time_elapsed".equals(triggerStrategy);
boolean committed = true; // last instant committed successfully
CompactionUtil.scheduleCompaction(writeClient, deltaTimeCompaction, committed);
// A new compaction plan instant is now on the timeline
Related Pages
Implements Principle
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment