Implementation:Apache Hudi CompactionPlanStrategy Select
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for selecting which pending compaction plans to execute from the Hudi timeline using pluggable strategies, provided by Apache Hudi.
Description
This implementation provides a strategy interface (CompactionPlanStrategy) and a factory class (CompactionPlanStrategies) that together determine which pending compaction instants are picked up for execution during a compaction cycle.
The CompactionPlanStrategy interface defines a single method select(HoodieTimeline) that receives the pending compaction timeline and returns a filtered list of HoodieInstant objects. Three strategy constants are defined on the interface: ALL, INSTANTS, and NUM_INSTANTS.
The CompactionPlanStrategies factory class implements the strategy pattern by returning a lambda for each strategy type:
all: Returns all instants from the pending compaction timeline viaHoodieTimeline::getInstants.instants: Parses a comma-separated list of instant timestamps fromFlinkCompactionConfig.compactionPlanInstantand filters the timeline to only those matching instants.num_instants: Retrieves all pending instants, optionally reverses them if the compaction sequence is LIFO (checked viaCompactionUtil.isLIFO()), and returns a sublist capped atmaxNumCompactionPlans.
Usage
Use CompactionPlanStrategies.getStrategy(config) during compaction execution to obtain the appropriate strategy, then call strategy.select(pendingTimeline) to get the list of instants to compact. This is typically invoked in the HoodieFlinkCompactor.AsyncCompactionService.compact() method.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java - Lines: 29-38
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java - Lines: 44-73
Signature
// CompactionPlanStrategy.java:29-38
public interface CompactionPlanStrategy {
String ALL = "all";
String INSTANTS = "instants";
String NUM_INSTANTS = "num_instants";
/**
* Define how to select compaction plan to compact.
*/
List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline);
}
// CompactionPlanStrategies.java:44-73
public static CompactionPlanStrategy getStrategy(FlinkCompactionConfig config) {
switch (config.compactionPlanSelectStrategy.toLowerCase(Locale.ROOT)) {
case CompactionPlanStrategy.ALL:
return HoodieTimeline::getInstants;
case CompactionPlanStrategy.INSTANTS:
return pendingCompactionTimeline -> {
if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) {
log.warn("None instant is selected");
return Collections.emptyList();
}
List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
return pendingCompactionTimeline.getInstantsAsStream()
.filter(instant -> instants.contains(instant.requestedTime()))
.collect(Collectors.toList());
};
case CompactionPlanStrategy.NUM_INSTANTS:
return pendingCompactionTimeline -> {
List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants();
if (CompactionUtil.isLIFO(config.compactionSeq)) {
Collections.reverse(pendingCompactionPlanInstants);
}
int range = Math.min(config.maxNumCompactionPlans, pendingCompactionPlanInstants.size());
return pendingCompactionPlanInstants.subList(0, range);
};
default:
throw new UnsupportedOperationException("Unknown compaction plan strategy: "
+ config.compactionPlanSelectStrategy
+ ", supported strategies:[num_instants,instants,all]");
}
}
Import
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| pendingCompactionTimeline | HoodieTimeline |
Yes | The pending compaction timeline from Hudi table metadata, containing all compaction instants in requested state. |
| config | FlinkCompactionConfig |
Yes | Configuration object containing compactionPlanSelectStrategy (one of all, instants, num_instants), compactionSeq (FIFO or LIFO), maxNumCompactionPlans (integer), and optionally compactionPlanInstant (comma-separated instant times).
|
Outputs
| Name | Type | Description |
|---|---|---|
| selected instants | List<HoodieInstant> |
Filtered and ordered list of compaction instants to execute. Each instant corresponds to a compaction plan containing one or more CompactionOperation entries that will be expanded into CompactionPlanEvent objects downstream.
|
| strategy | CompactionPlanStrategy |
The strategy object (functional interface) returned by the factory, which can be invoked with select().
|
Usage Examples
// Configure the compaction strategy
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.compactionPlanSelectStrategy = "num_instants";
cfg.compactionSeq = "FIFO";
cfg.maxNumCompactionPlans = 3;
// Obtain the strategy from the factory
CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(cfg);
// Apply the strategy to the pending timeline
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline()
.filterPendingCompactionTimeline();
List<HoodieInstant> selected = strategy.select(pendingCompactionTimeline);
// selected now contains up to 3 oldest pending compaction instants
for (HoodieInstant instant : selected) {
System.out.println("Will compact: " + instant.requestedTime());
}