Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi CompactOperator ProcessElement

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for executing individual file group compaction operations within a Flink streaming operator, provided by Apache Hudi.

Description

This implementation centers on two cooperating classes:

  1. CompactOperator -- A Flink OneInputStreamOperator that receives CompactionPlanEvent records and delegates each to a CompactHandler. The processElement() method extracts the compaction instant time, determines whether the meta client needs reloading (based on whether the instant has changed), and routes the event to either the data table handler or the metadata table handler. It supports both synchronous and asynchronous execution modes.
  1. HoodieFlinkMergeOnReadTableCompactor -- The engine-specific compactor (extending HoodieCompactor) that performs the physical merge. Its compact() method receives the write configuration, compaction operation (containing base file path and log file paths), instant time, task context supplier, reader context, and table reference. It uses the file group reader-based compaction approach to merge log files into a new base file, returning a List<WriteStatus> describing the output files.

The CompactHandler bridges the two by instantiating HoodieFlinkMergeOnReadTableCompactor, handling schema evolution via InternalSchemaManager, creating a FlinkRowDataReaderContext for Flink-native row reading, and collecting the resulting CompactionCommitEvent. On failure in async mode, it emits a failed event (with null write statuses) instead of crashing the operator.

Usage

This operator is placed in the middle of the compaction Flink pipeline, after the rebalance() shuffle and before CompactionCommitSink. Configure its parallelism via FlinkOptions.COMPACTION_TASKS to control how many file groups are compacted concurrently.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
  • Lines: 121-132
  • File: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
  • Lines: 69-85

Signature

// CompactOperator.java:121-132
@Override
public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {
    final CompactionPlanEvent event = record.getValue();
    final String instantTime = event.getCompactionInstantTime();
    boolean needReloadMetaClient = !instantTime.equals(prevCompactInstant);
    prevCompactInstant = instantTime;

    if (event.isMetadataTable()) {
      mdtCompactHandler.get().compact(executor, event, collector, needReloadMetaClient, compactionMetrics);
    } else {
      compactHandler.get().compact(executor, event, collector, needReloadMetaClient, compactionMetrics);
    }
}

// HoodieFlinkMergeOnReadTableCompactor.java:69-85
public List<WriteStatus> compact(HoodieWriteConfig writeConfig,
                                 CompactionOperation operation,
                                 String instantTime,
                                 TaskContextSupplier taskContextSupplier,
                                 HoodieReaderContext<?> readerContext,
                                 HoodieTable table) throws IOException {
    String maxInstantTime = getMaxInstantTime(table.getMetaClient());
    log.info("Compact using file group reader based compaction, operation: {}.", operation);
    return compact(
        writeConfig,
        operation,
        instantTime,
        readerContext,
        table,
        maxInstantTime,
        taskContextSupplier);
}

Import

import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.table.format.FlinkRowDataReaderContext;
import org.apache.hudi.config.HoodieWriteConfig;

I/O Contract

Inputs

Name Type Required Description
record StreamRecord<CompactionPlanEvent> Yes A Flink stream record wrapping a CompactionPlanEvent containing the compaction instant time, CompactionOperation (with file group ID, base file path, log file paths), and metadata table flag.
writeConfig HoodieWriteConfig Yes Write configuration for the compaction, including schema, storage config, and compaction memory settings.
operation CompactionOperation Yes Describes the file group to compact: file group ID, base file path (optional), list of log file paths, and partition path.
instantTime String Yes The compaction instant time being executed.
readerContext HoodieReaderContext<?> Yes Engine-specific reader context; in Flink this is a FlinkRowDataReaderContext that handles schema evolution and Flink-native row data reading.
table HoodieTable Yes The Hudi table instance providing access to metadata client, timeline, and storage.

Outputs

Name Type Description
CompactionCommitEvent CompactionCommitEvent Emitted downstream for each completed (or failed) compaction operation. Contains the instant time, file ID, List<WriteStatus> (new base files written, or null on failure), task ID, metadata table flag, and log compaction flag.
WriteStatus list List<WriteStatus> Returned by the compactor, describing each new base file produced by the merge: file path, total records written, total records updated, total error records, and partition path.

Usage Examples

// The CompactOperator is typically instantiated as part of the Flink pipeline:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
    .name("compaction_source")
    .uid("uid_compaction_source")
    .rebalance()
    .transform("compact_task",
        TypeInformation.of(CompactionCommitEvent.class),
        new CompactOperator(conf))       // <-- this operator
    .setParallelism(compactionParallelism)
    .addSink(new CompactionCommitSink(conf))
    .name("compaction_commit")
    .uid("uid_compaction_commit")
    .setParallelism(1);

// Internally, for each CompactionPlanEvent received:
// 1. CompactOperator.processElement() delegates to CompactHandler.compact()
// 2. CompactHandler instantiates HoodieFlinkMergeOnReadTableCompactor
// 3. The compactor merges base file + log files -> new base file
// 4. A CompactionCommitEvent is emitted downstream with the WriteStatus results

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment