Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieFileGroupReader GetClosableIterator

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Schema_Management
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for reading a complete file group with end-to-end schema evolution, merging base and log records under per-file schemas, and emitting a single iterator of fully evolved engine-specific records provided by Apache Hudi.

Description

HoodieFileGroupReader.getClosableIterator is the top-level entry point that Flink read tasks call to obtain an iterator of RowData records from a file group. Internally, it initializes record iterators for the base file and all log files in the file slice, delegates schema resolution to the FileGroupReaderSchemaHandler, runs records through the record buffer for merge-on-read reconciliation, and applies a final output projection. Every record in the returned iterator conforms to the requested query schema.

FileGroupReaderSchemaHandler.getRequiredSchemaForFileAndRenamedColumns is the per-file schema resolution method. Given a StoragePath, it extracts the commit instant from the file name, looks up the InternalSchema version that was active at that commit using InternalSchemaCache, and merges it with the pruned internal schema. The merge produces a HoodieSchema for reading the file and a Map of renamed columns. When the internal schema is empty (schema evolution is disabled), it returns the required schema with an empty rename map.

FlinkReaderContextFactory is the Flink-specific factory that constructs a FlinkRowDataReaderContext for table services such as compaction and clustering. It lazily initializes the InternalSchemaManager from the HoodieTableMetaClient, ensuring that the schema evolution infrastructure is only loaded when it is actually needed.

Usage

Use these methods when:

  • A Flink source operator reads a file group and needs a single iterator of fully evolved RowData records.
  • A compaction or clustering task must read file groups through the generic file group reader with Flink-specific schema evolution.
  • Per-file schema resolution and rename detection are needed for a specific base or log file path.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
  • Lines: 300-302
  • File: hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
  • Lines: 134-144
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkReaderContextFactory.java
  • Lines: 44-55

Signature

public ClosableIterator<T> getClosableIterator() throws IOException
public Pair<HoodieSchema, Map<String, String>> getRequiredSchemaForFileAndRenamedColumns(
    StoragePath path)
@Override
public HoodieReaderContext<RowData> getContext()

Import

import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.table.format.FlinkReaderContextFactory;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.flink.table.data.RowData;

I/O Contract

Inputs

Name Type Required Description
readerContext HoodieReaderContext<T> Yes Engine-specific reader context (FlinkRowDataReaderContext for Flink) that provides file I/O and schema evolution capabilities
dataSchema HoodieSchema Yes The full table schema describing all columns
requestedSchema HoodieSchema Yes The schema of columns requested by the query (may be a projection of dataSchema)
internalSchemaOpt Option<InternalSchema> No The InternalSchema for schema evolution; empty if schema evolution is disabled
fileSlice FileSlice Yes The file slice containing the base file and log files to read
path StoragePath Yes File path for per-file schema resolution (passed to getRequiredSchemaForFileAndRenamedColumns)
metaClient HoodieTableMetaClient Yes Meta client used by FlinkReaderContextFactory to access storage configuration and timeline layout

Outputs

Name Type Description
getClosableIterator result ClosableIterator<T> An iterator of engine-specific records (RowData in Flink) where each record conforms to the requested query schema with per-file merged schemas, type promotions, renames, and null-filled new columns
getRequiredSchemaForFileAndRenamedColumns result Pair<HoodieSchema, Map<String, String>> A pair of (mergedSchema, renamedColumnsMap) for reading a specific file; the schema uses query-era types and the map records renamed columns
getContext result HoodieReaderContext<RowData> A FlinkRowDataReaderContext configured with a lazily-initialized InternalSchemaManager for table services

Usage Examples

// Build and use a HoodieFileGroupReader in Flink
HoodieFileGroupReader<RowData> reader = HoodieFileGroupReader.<RowData>newBuilder()
    .withReaderContext(flinkReaderContext)
    .withStorage(storage)
    .withTablePath(tablePath)
    .withLatestCommitTime(latestInstant)
    .withDataSchema(tableSchema)
    .withRequestedSchema(querySchema)
    .withInternalSchema(Option.of(internalSchema))
    .withHoodieTableMetaClient(metaClient)
    .withProperties(props)
    .withBaseFile(Option.of(baseFile))
    .withLogFiles(logFiles.stream())
    .withPartitionPath(partitionPath)
    .build();

// Get the fully evolved iterator
try (ClosableIterator<RowData> iterator = reader.getClosableIterator()) {
    while (iterator.hasNext()) {
        RowData record = iterator.next();
        // record conforms to querySchema regardless of which schema
        // era the source files were written under
        processRecord(record);
    }
}

// Per-file schema resolution example
FileGroupReaderSchemaHandler<RowData> schemaHandler = new FileGroupReaderSchemaHandler<>(
    readerContext, tableSchema, requestedSchema,
    Option.of(internalSchema), props, metaClient);

StoragePath filePath = new StoragePath("hdfs:///table/2024/base_20240101120000.parquet");
Pair<HoodieSchema, Map<String, String>> result =
    schemaHandler.getRequiredSchemaForFileAndRenamedColumns(filePath);
HoodieSchema mergedSchema = result.getLeft();
Map<String, String> renamedColumns = result.getRight();

// FlinkReaderContextFactory for table services
FlinkReaderContextFactory factory = new FlinkReaderContextFactory(metaClient);
HoodieReaderContext<RowData> context = factory.getContext();

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment