Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieFlinkCompactor Main

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for running a standalone Flink compaction job that discovers, executes, and commits compaction plans for Hudi MOR tables, provided by Apache Hudi.

Description

HoodieFlinkCompactor is a standalone Flink application with a main() entry point that orchestrates the full lifecycle of a Hudi MOR compaction job. It parses CLI arguments into a FlinkCompactionConfig, initializes a HoodieFlinkWriteClient, and then either runs a single compaction cycle or enters a continuous service loop.

The inner class AsyncCompactionService (extending HoodieAsyncTableService) contains the core compact() method that:

  1. Optionally schedules a new compaction plan via the write client (if --schedule is enabled).
  2. Fetches the pending compaction timeline and applies the configured CompactionPlanStrategy to select which instants to compact.
  3. Handles stale inflight compaction instants when retry-last-failed mode is enabled.
  4. Rolls back any inflight compaction instants that are being re-executed.
  5. Retrieves the HoodieCompactionPlan for each selected instant and filters out empty plans.
  6. Transitions selected instants from requested to inflight state on the timeline.
  7. Builds and executes a Flink streaming pipeline: CompactionPlanSourceFunction -> rebalance() -> CompactOperator -> CompactionCommitSink.

Usage

Run as a standalone Flink application from the command line. Typical invocations:

  • Single-run compaction: flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor hudi-flink-bundle.jar --path s3://bucket/table
  • Service mode: Add --service flag for continuous compaction.
  • With scheduling: Add --schedule to have the compactor also schedule new compaction plans.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
  • Lines: 75-117 (main method), 264-364 (compact pipeline)

Signature

// HoodieFlinkCompactor.java:75-117
public static void main(String[] args) throws Exception {
    FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);

    // Validate configuration
    if (cfg.retryLastFailedJob && cfg.maxProcessingTimeMs <= 0) {
      LOG.warn("--retry-last-failed-job is enabled but --job-max-processing-time-ms is not set or <= 0. "
          + "The retry-last-failed feature will have no effect.");
    }

    if (cfg.serviceMode) {
      AsyncCompactionService service = new AsyncCompactionService(cfg, conf);
      new HoodieFlinkCompactor(service).start(true);
    } else {
      new RetryHelper<Void, RuntimeException>(0, cfg.retry, 0, "java.lang.RuntimeException", "Flink compaction")
          .start(() -> {
            AsyncCompactionService service;
            try {
              service = new AsyncCompactionService(cfg, conf);
            } catch (Exception e) {
              throw new RuntimeException("Failed to create AsyncCompactionService", e);
            }
            try {
              new HoodieFlinkCompactor(service).start(false);
            } catch (ApplicationExecutionException aee) {
              if (aee.getMessage() != null && aee.getMessage().contains(NO_EXECUTE_KEYWORD)) {
                LOG.info("Compaction is not performed - no work to do");
              } else {
                throw new RuntimeException(aee);
              }
            } catch (Exception e) {
              throw new RuntimeException(e);
            } finally {
              service.shutDown();
            }
            return null;
          });
    }
}

// AsyncCompactionService.compact() - pipeline construction (lines 264-364)
private void compact() throws Exception {
    table.getMetaClient().reloadActiveTimeline();
    if (cfg.schedule) {
      boolean scheduled = writeClient.scheduleCompaction(Option.empty()).isPresent();
      if (!scheduled) {
        LOG.info("No compaction plan for this job ");
        return;
      }
      table.getMetaClient().reloadActiveTimeline();
    }

    HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
    List<HoodieInstant> requested = CompactionPlanStrategies.getStrategy(cfg).select(pendingCompactionTimeline);
    // ... rollback inflight instants, fetch compaction plans, build pipeline ...

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
        .name("compaction_source")
        .uid("uid_compaction_source")
        .rebalance()
        .transform("compact_task",
            TypeInformation.of(CompactionCommitEvent.class),
            new CompactOperator(conf))
        .setParallelism(compactionParallelism)
        .addSink(new CompactionCommitSink(conf))
        .name("compaction_commit")
        .uid("uid_compaction_commit")
        .setParallelism(1)
        .getTransformation()
        .setMaxParallelism(1);

    env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
}

Import

import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.common.util.RetryHelper;

I/O Contract

Inputs

Name Type Required Description
args String[] Yes CLI arguments parsed by JCommander into FlinkCompactionConfig.
path String (CLI --path) Yes Base path of the target Hudi MOR table.
compactionTriggerStrategy String (CLI --compaction-trigger-strategy) No Strategy to trigger compaction: num_commits, time_elapsed, num_and_time, num_or_time. Default: num_commits.
compactionDeltaCommits Integer (CLI --compaction-delta-commits) No Number of delta commits to trigger compaction. Default: 1.
compactionTasks Integer (CLI --compaction-tasks) No Parallelism for compact operators. -1 means auto (one per operation). Default: -1.
schedule Boolean (CLI --schedule) No Whether to schedule compaction plans in addition to executing them. Default: false.
serviceMode Boolean (CLI --service) No Whether to run continuously. Default: false.
compactionSeq String (CLI --seq) No Execution order: FIFO or LIFO. Default: FIFO.
compactionPlanSelectStrategy String (CLI --plan-select-strategy) No Plan selection strategy: num_instants, all, instants. Default: num_instants.
maxNumCompactionPlans Integer (CLI --max-num-plans) No Maximum number of plans to select per cycle. Default: 1.

Outputs

Name Type Description
Flink pipeline execution Side effect A Flink streaming pipeline is built and executed: CompactionPlanSourceFunction -> rebalance() -> CompactOperator -> CompactionCommitSink.
Committed compaction instants Side effect (timeline) Compaction instants are transitioned from requested -> inflight -> committed on the Hudi timeline.
New base files Side effect (storage) Delta log files are merged into new base files on the underlying storage.

Usage Examples

// Programmatic invocation of the standalone compactor
String[] args = new String[] {
    "--path", "s3://my-bucket/my-hudi-table",
    "--compaction-tasks", "4",
    "--plan-select-strategy", "num_instants",
    "--max-num-plans", "3",
    "--seq", "FIFO",
    "--schedule"
};

// Single-run mode (default)
HoodieFlinkCompactor.main(args);

// Service mode - add --service flag
String[] serviceArgs = new String[] {
    "--path", "s3://my-bucket/my-hudi-table",
    "--service",
    "--min-compaction-interval-seconds", "300",
    "--compaction-tasks", "8"
};
HoodieFlinkCompactor.main(serviceArgs);

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment