Implementation:Apache Hudi HoodieFileGroupReader GetClosableIterator
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Schema_Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for reading a complete file group with end-to-end schema evolution, merging base and log records under per-file schemas, and emitting a single iterator of fully evolved engine-specific records provided by Apache Hudi.
Description
HoodieFileGroupReader.getClosableIterator is the top-level entry point that Flink read tasks call to obtain an iterator of RowData records from a file group. Internally, it initializes record iterators for the base file and all log files in the file slice, delegates schema resolution to the FileGroupReaderSchemaHandler, runs records through the record buffer for merge-on-read reconciliation, and applies a final output projection. Every record in the returned iterator conforms to the requested query schema.
FileGroupReaderSchemaHandler.getRequiredSchemaForFileAndRenamedColumns is the per-file schema resolution method. Given a StoragePath, it extracts the commit instant from the file name, looks up the InternalSchema version that was active at that commit using InternalSchemaCache, and merges it with the pruned internal schema. The merge produces a HoodieSchema for reading the file and a Map of renamed columns. When the internal schema is empty (schema evolution is disabled), it returns the required schema with an empty rename map.
FlinkReaderContextFactory is the Flink-specific factory that constructs a FlinkRowDataReaderContext for table services such as compaction and clustering. It lazily initializes the InternalSchemaManager from the HoodieTableMetaClient, ensuring that the schema evolution infrastructure is only loaded when it is actually needed.
Usage
Use these methods when:
- A Flink source operator reads a file group and needs a single iterator of fully evolved RowData records.
- A compaction or clustering task must read file groups through the generic file group reader with Flink-specific schema evolution.
- Per-file schema resolution and rename detection are needed for a specific base or log file path.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java - Lines: 300-302
- File:
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java - Lines: 134-144
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java - Lines: 44-55
Signature
public ClosableIterator<T> getClosableIterator() throws IOException
public Pair<HoodieSchema, Map<String, String>> getRequiredSchemaForFileAndRenamedColumns(
StoragePath path)
@Override
public HoodieReaderContext<RowData> getContext()
Import
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.table.format.FlinkReaderContextFactory;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.flink.table.data.RowData;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| readerContext | HoodieReaderContext<T> |
Yes | Engine-specific reader context (FlinkRowDataReaderContext for Flink) that provides file I/O and schema evolution capabilities |
| dataSchema | HoodieSchema |
Yes | The full table schema describing all columns |
| requestedSchema | HoodieSchema |
Yes | The schema of columns requested by the query (may be a projection of dataSchema) |
| internalSchemaOpt | Option<InternalSchema> |
No | The InternalSchema for schema evolution; empty if schema evolution is disabled |
| fileSlice | FileSlice |
Yes | The file slice containing the base file and log files to read |
| path | StoragePath |
Yes | File path for per-file schema resolution (passed to getRequiredSchemaForFileAndRenamedColumns) |
| metaClient | HoodieTableMetaClient |
Yes | Meta client used by FlinkReaderContextFactory to access storage configuration and timeline layout |
Outputs
| Name | Type | Description |
|---|---|---|
| getClosableIterator result | ClosableIterator<T> |
An iterator of engine-specific records (RowData in Flink) where each record conforms to the requested query schema with per-file merged schemas, type promotions, renames, and null-filled new columns |
| getRequiredSchemaForFileAndRenamedColumns result | Pair<HoodieSchema, Map<String, String>> |
A pair of (mergedSchema, renamedColumnsMap) for reading a specific file; the schema uses query-era types and the map records renamed columns |
| getContext result | HoodieReaderContext<RowData> |
A FlinkRowDataReaderContext configured with a lazily-initialized InternalSchemaManager for table services |
Usage Examples
// Build and use a HoodieFileGroupReader in Flink
HoodieFileGroupReader<RowData> reader = HoodieFileGroupReader.<RowData>newBuilder()
.withReaderContext(flinkReaderContext)
.withStorage(storage)
.withTablePath(tablePath)
.withLatestCommitTime(latestInstant)
.withDataSchema(tableSchema)
.withRequestedSchema(querySchema)
.withInternalSchema(Option.of(internalSchema))
.withHoodieTableMetaClient(metaClient)
.withProperties(props)
.withBaseFile(Option.of(baseFile))
.withLogFiles(logFiles.stream())
.withPartitionPath(partitionPath)
.build();
// Get the fully evolved iterator
try (ClosableIterator<RowData> iterator = reader.getClosableIterator()) {
while (iterator.hasNext()) {
RowData record = iterator.next();
// record conforms to querySchema regardless of which schema
// era the source files were written under
processRecord(record);
}
}
// Per-file schema resolution example
FileGroupReaderSchemaHandler<RowData> schemaHandler = new FileGroupReaderSchemaHandler<>(
readerContext, tableSchema, requestedSchema,
Option.of(internalSchema), props, metaClient);
StoragePath filePath = new StoragePath("hdfs:///table/2024/base_20240101120000.parquet");
Pair<HoodieSchema, Map<String, String>> result =
schemaHandler.getRequiredSchemaForFileAndRenamedColumns(filePath);
HoodieSchema mergedSchema = result.getLeft();
Map<String, String> renamedColumns = result.getRight();
// FlinkReaderContextFactory for table services
FlinkReaderContextFactory factory = new FlinkReaderContextFactory(metaClient);
HoodieReaderContext<RowData> context = factory.getContext();