Implementation:Apache Hudi CompactOperator ProcessElement
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for executing individual file group compaction operations within a Flink streaming operator, provided by Apache Hudi.
Description
This implementation centers on two cooperating classes:
CompactOperator-- A FlinkOneInputStreamOperatorthat receivesCompactionPlanEventrecords and delegates each to aCompactHandler. TheprocessElement()method extracts the compaction instant time, determines whether the meta client needs reloading (based on whether the instant has changed), and routes the event to either the data table handler or the metadata table handler. It supports both synchronous and asynchronous execution modes.
HoodieFlinkMergeOnReadTableCompactor-- The engine-specific compactor (extendingHoodieCompactor) that performs the physical merge. Itscompact()method receives the write configuration, compaction operation (containing base file path and log file paths), instant time, task context supplier, reader context, and table reference. It uses the file group reader-based compaction approach to merge log files into a new base file, returning aList<WriteStatus>describing the output files.
The CompactHandler bridges the two by instantiating HoodieFlinkMergeOnReadTableCompactor, handling schema evolution via InternalSchemaManager, creating a FlinkRowDataReaderContext for Flink-native row reading, and collecting the resulting CompactionCommitEvent. On failure in async mode, it emits a failed event (with null write statuses) instead of crashing the operator.
Usage
This operator is placed in the middle of the compaction Flink pipeline, after the rebalance() shuffle and before CompactionCommitSink. Configure its parallelism via FlinkOptions.COMPACTION_TASKS to control how many file groups are compacted concurrently.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java - Lines: 121-132
- File:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java - Lines: 69-85
Signature
// CompactOperator.java:121-132
@Override
public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {
final CompactionPlanEvent event = record.getValue();
final String instantTime = event.getCompactionInstantTime();
boolean needReloadMetaClient = !instantTime.equals(prevCompactInstant);
prevCompactInstant = instantTime;
if (event.isMetadataTable()) {
mdtCompactHandler.get().compact(executor, event, collector, needReloadMetaClient, compactionMetrics);
} else {
compactHandler.get().compact(executor, event, collector, needReloadMetaClient, compactionMetrics);
}
}
// HoodieFlinkMergeOnReadTableCompactor.java:69-85
public List<WriteStatus> compact(HoodieWriteConfig writeConfig,
CompactionOperation operation,
String instantTime,
TaskContextSupplier taskContextSupplier,
HoodieReaderContext<?> readerContext,
HoodieTable table) throws IOException {
String maxInstantTime = getMaxInstantTime(table.getMetaClient());
log.info("Compact using file group reader based compaction, operation: {}.", operation);
return compact(
writeConfig,
operation,
instantTime,
readerContext,
table,
maxInstantTime,
taskContextSupplier);
}
Import
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.table.format.FlinkRowDataReaderContext;
import org.apache.hudi.config.HoodieWriteConfig;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| record | StreamRecord<CompactionPlanEvent> |
Yes | A Flink stream record wrapping a CompactionPlanEvent containing the compaction instant time, CompactionOperation (with file group ID, base file path, log file paths), and metadata table flag.
|
| writeConfig | HoodieWriteConfig |
Yes | Write configuration for the compaction, including schema, storage config, and compaction memory settings. |
| operation | CompactionOperation |
Yes | Describes the file group to compact: file group ID, base file path (optional), list of log file paths, and partition path. |
| instantTime | String |
Yes | The compaction instant time being executed. |
| readerContext | HoodieReaderContext<?> |
Yes | Engine-specific reader context; in Flink this is a FlinkRowDataReaderContext that handles schema evolution and Flink-native row data reading.
|
| table | HoodieTable |
Yes | The Hudi table instance providing access to metadata client, timeline, and storage. |
Outputs
| Name | Type | Description |
|---|---|---|
| CompactionCommitEvent | CompactionCommitEvent |
Emitted downstream for each completed (or failed) compaction operation. Contains the instant time, file ID, List<WriteStatus> (new base files written, or null on failure), task ID, metadata table flag, and log compaction flag.
|
| WriteStatus list | List<WriteStatus> |
Returned by the compactor, describing each new base file produced by the merge: file path, total records written, total records updated, total error records, and partition path. |
Usage Examples
// The CompactOperator is typically instantiated as part of the Flink pipeline:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new CompactOperator(conf)) // <-- this operator
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1);
// Internally, for each CompactionPlanEvent received:
// 1. CompactOperator.processElement() delegates to CompactHandler.compact()
// 2. CompactHandler instantiates HoodieFlinkMergeOnReadTableCompactor
// 3. The compactor merges base file + log files -> new base file
// 4. A CompactionCommitEvent is emitted downstream with the WriteStatus results