Implementation:Apache Hudi HoodieSplitReaderFunction Read
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for reading Hudi file groups (base Parquet files and log files) and producing batched Flink RowData records, provided by Apache Hudi.
Description
HoodieSplitReaderFunction is the default reader function implementation for both merge-on-read (MOR) and copy-on-write (COW) tables in the Hudi Flink FLIP-27 source. It implements SplitReaderFunction<RowData> and is invoked by the Flink source reader framework whenever a new split is assigned to a reader subtask.
The read(HoodieSourceSplit) method performs the following steps:
- FileSlice construction: Creates a
FileSlicefrom the split's metadata: partition path, file ID, optional base file path, and optional log file paths. TheFileSliceis the Hudi abstraction that represents a complete file group at a point in time. - FileGroupReader creation: Builds a
HoodieFileGroupReader<RowData>using the builder pattern. The reader is configured with theHoodieReaderContext(which knows how to create FlinkRowDataobjects), the table metaclient, the file slice, the data schema, the requested schema (for projection pushdown), the merge type, and optionally an internal schema for schema evolution. - Record iteration: Obtains a
ClosableIterator<RowData>from the file group reader. This iterator transparently merges base file and log file records and applies the requested schema projection. - Batching: Wraps the record iterator in a
DefaultHoodieBatchReaderthat producesRecordsWithSplitIds<HoodieRecordWithPosition<RowData>>objects. Each record is annotated with its file offset and record offset for position tracking.
The companion utility ParquetSplitReaderUtil (version-specific, e.g., in hudi-flink1.17.x) provides vectorized Parquet reading capabilities for COW tables in the legacy source path, generating ParquetColumnarRowSplitReader instances with partition column injection and type conversion support.
Usage
This function is used internally by the Hudi FLIP-27 source's split reader. It is instantiated once per reader subtask and reused across multiple splits. Users control its behavior through connector options:
- Schema projection is driven by the SQL query's column selection
- Merge type is controlled by
hoodie.datasource.merge.type - Schema evolution is activated via
hoodie.schema.evolution.enabled
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java - Lines: 52-138
- Also:
hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java(Lines 50-595)
Signature
@Override
public CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>> read(HoodieSourceSplit split) {
try {
this.fileGroupReader = createFileGroupReader(split);
final ClosableIterator<RowData> recordIterator = fileGroupReader.getClosableIterator();
DefaultHoodieBatchReader<RowData> defaultBatchReader =
new DefaultHoodieBatchReader<RowData>(configuration);
return defaultBatchReader.batch(split, recordIterator);
} catch (IOException e) {
throw new HoodieIOException(
"Failed to read from file group: " + split.getFileId(), e);
}
}
Import
import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.table.data.RowData;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| split | HoodieSourceSplit |
Yes | The source split containing the file group information: partition path, file ID, optional base file path, optional log file paths. |
| hoodieTable | HoodieFlinkTable<RowData> |
Yes (constructor) | The Hudi table instance providing access to the metaclient and table configuration. |
| readerContext | HoodieReaderContext<RowData> |
Yes (constructor) | Reader context that provides Flink-specific record creation and schema handling. |
| tableSchema | HoodieSchema |
Yes (constructor) | The full table schema (all columns). |
| requiredSchema | HoodieSchema |
Yes (constructor) | The projected schema containing only columns needed by the query. |
| mergeType | String |
Yes (constructor) | The merge strategy type (e.g., position-based or payload-based) for MOR table reading. |
| internalSchemaOption | Option<InternalSchema> |
No (constructor) | Internal schema for schema evolution support. Empty if schema evolution is disabled. |
| configuration | Configuration |
Yes (constructor) | Flink configuration for batch reader settings. |
Outputs
| Name | Type | Description |
|---|---|---|
| return value | CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>> |
A closeable iterator over batched records. Each RecordsWithSplitIds contains a batch of HoodieRecordWithPosition<RowData> objects associated with the split ID. Each record carries file offset and record offset for position tracking.
|
Usage Examples
// HoodieSplitReaderFunction is instantiated by the Hudi source internally.
// The following illustrates the construction and invocation:
import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
import org.apache.hudi.source.split.HoodieSourceSplit;
// Create the reader function (once per reader subtask)
HoodieSplitReaderFunction readerFunction = new HoodieSplitReaderFunction(
hoodieTable,
readerContext,
configuration,
tableSchema,
requiredSchema,
"position_based_merge",
internalSchemaOption
);
// Read a split (invoked by the source reader for each assigned split)
HoodieSourceSplit split = ...; // assigned by the enumerator
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>> iterator =
readerFunction.read(split);
// Iterate over batched records
while (iterator.hasNext()) {
RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> batch = iterator.next();
// Each batch is associated with split.splitId()
// Records contain RowData + position information
}
iterator.close();