Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi CompactionPlanStrategy Select

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for selecting which pending compaction plans to execute from the Hudi timeline using pluggable strategies, provided by Apache Hudi.

Description

This implementation provides a strategy interface (CompactionPlanStrategy) and a factory class (CompactionPlanStrategies) that together determine which pending compaction instants are picked up for execution during a compaction cycle.

The CompactionPlanStrategy interface defines a single method select(HoodieTimeline) that receives the pending compaction timeline and returns a filtered list of HoodieInstant objects. Three strategy constants are defined on the interface: ALL, INSTANTS, and NUM_INSTANTS.

The CompactionPlanStrategies factory class implements the strategy pattern by returning a lambda for each strategy type:

  • all: Returns all instants from the pending compaction timeline via HoodieTimeline::getInstants.
  • instants: Parses a comma-separated list of instant timestamps from FlinkCompactionConfig.compactionPlanInstant and filters the timeline to only those matching instants.
  • num_instants: Retrieves all pending instants, optionally reverses them if the compaction sequence is LIFO (checked via CompactionUtil.isLIFO()), and returns a sublist capped at maxNumCompactionPlans.

Usage

Use CompactionPlanStrategies.getStrategy(config) during compaction execution to obtain the appropriate strategy, then call strategy.select(pendingTimeline) to get the list of instants to compact. This is typically invoked in the HoodieFlinkCompactor.AsyncCompactionService.compact() method.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java
  • Lines: 29-38
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java
  • Lines: 44-73

Signature

// CompactionPlanStrategy.java:29-38
public interface CompactionPlanStrategy {
  String ALL = "all";
  String INSTANTS = "instants";
  String NUM_INSTANTS = "num_instants";

  /**
   * Define how to select compaction plan to compact.
   */
  List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline);
}

// CompactionPlanStrategies.java:44-73
public static CompactionPlanStrategy getStrategy(FlinkCompactionConfig config) {
    switch (config.compactionPlanSelectStrategy.toLowerCase(Locale.ROOT)) {
      case CompactionPlanStrategy.ALL:
        return HoodieTimeline::getInstants;
      case CompactionPlanStrategy.INSTANTS:
        return pendingCompactionTimeline -> {
          if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) {
            log.warn("None instant is selected");
            return Collections.emptyList();
          }
          List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
          return pendingCompactionTimeline.getInstantsAsStream()
              .filter(instant -> instants.contains(instant.requestedTime()))
              .collect(Collectors.toList());
        };
      case CompactionPlanStrategy.NUM_INSTANTS:
        return pendingCompactionTimeline -> {
          List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants();
          if (CompactionUtil.isLIFO(config.compactionSeq)) {
            Collections.reverse(pendingCompactionPlanInstants);
          }
          int range = Math.min(config.maxNumCompactionPlans, pendingCompactionPlanInstants.size());
          return pendingCompactionPlanInstants.subList(0, range);
        };
      default:
        throw new UnsupportedOperationException("Unknown compaction plan strategy: "
            + config.compactionPlanSelectStrategy
            + ", supported strategies:[num_instants,instants,all]");
    }
}

Import

import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;

I/O Contract

Inputs

Name Type Required Description
pendingCompactionTimeline HoodieTimeline Yes The pending compaction timeline from Hudi table metadata, containing all compaction instants in requested state.
config FlinkCompactionConfig Yes Configuration object containing compactionPlanSelectStrategy (one of all, instants, num_instants), compactionSeq (FIFO or LIFO), maxNumCompactionPlans (integer), and optionally compactionPlanInstant (comma-separated instant times).

Outputs

Name Type Description
selected instants List<HoodieInstant> Filtered and ordered list of compaction instants to execute. Each instant corresponds to a compaction plan containing one or more CompactionOperation entries that will be expanded into CompactionPlanEvent objects downstream.
strategy CompactionPlanStrategy The strategy object (functional interface) returned by the factory, which can be invoked with select().

Usage Examples

// Configure the compaction strategy
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.compactionPlanSelectStrategy = "num_instants";
cfg.compactionSeq = "FIFO";
cfg.maxNumCompactionPlans = 3;

// Obtain the strategy from the factory
CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(cfg);

// Apply the strategy to the pending timeline
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline()
    .filterPendingCompactionTimeline();
List<HoodieInstant> selected = strategy.select(pendingCompactionTimeline);

// selected now contains up to 3 oldest pending compaction instants
for (HoodieInstant instant : selected) {
    System.out.println("Will compact: " + instant.requestedTime());
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment