Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieSplitReaderFunction Read

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for reading Hudi file groups (base Parquet files and log files) and producing batched Flink RowData records, provided by Apache Hudi.

Description

HoodieSplitReaderFunction is the default reader function implementation for both merge-on-read (MOR) and copy-on-write (COW) tables in the Hudi Flink FLIP-27 source. It implements SplitReaderFunction<RowData> and is invoked by the Flink source reader framework whenever a new split is assigned to a reader subtask.

The read(HoodieSourceSplit) method performs the following steps:

  1. FileSlice construction: Creates a FileSlice from the split's metadata: partition path, file ID, optional base file path, and optional log file paths. The FileSlice is the Hudi abstraction that represents a complete file group at a point in time.
  2. FileGroupReader creation: Builds a HoodieFileGroupReader<RowData> using the builder pattern. The reader is configured with the HoodieReaderContext (which knows how to create Flink RowData objects), the table metaclient, the file slice, the data schema, the requested schema (for projection pushdown), the merge type, and optionally an internal schema for schema evolution.
  3. Record iteration: Obtains a ClosableIterator<RowData> from the file group reader. This iterator transparently merges base file and log file records and applies the requested schema projection.
  4. Batching: Wraps the record iterator in a DefaultHoodieBatchReader that produces RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> objects. Each record is annotated with its file offset and record offset for position tracking.

The companion utility ParquetSplitReaderUtil (version-specific, e.g., in hudi-flink1.17.x) provides vectorized Parquet reading capabilities for COW tables in the legacy source path, generating ParquetColumnarRowSplitReader instances with partition column injection and type conversion support.

Usage

This function is used internally by the Hudi FLIP-27 source's split reader. It is instantiated once per reader subtask and reused across multiple splits. Users control its behavior through connector options:

  • Schema projection is driven by the SQL query's column selection
  • Merge type is controlled by hoodie.datasource.merge.type
  • Schema evolution is activated via hoodie.schema.evolution.enabled

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
  • Lines: 52-138
  • Also: hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java (Lines 50-595)

Signature

@Override
public CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>> read(HoodieSourceSplit split) {
    try {
        this.fileGroupReader = createFileGroupReader(split);
        final ClosableIterator<RowData> recordIterator = fileGroupReader.getClosableIterator();
        DefaultHoodieBatchReader<RowData> defaultBatchReader =
            new DefaultHoodieBatchReader<RowData>(configuration);
        return defaultBatchReader.batch(split, recordIterator);
    } catch (IOException e) {
        throw new HoodieIOException(
            "Failed to read from file group: " + split.getFileId(), e);
    }
}

Import

import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.table.data.RowData;

I/O Contract

Inputs

Name Type Required Description
split HoodieSourceSplit Yes The source split containing the file group information: partition path, file ID, optional base file path, optional log file paths.
hoodieTable HoodieFlinkTable<RowData> Yes (constructor) The Hudi table instance providing access to the metaclient and table configuration.
readerContext HoodieReaderContext<RowData> Yes (constructor) Reader context that provides Flink-specific record creation and schema handling.
tableSchema HoodieSchema Yes (constructor) The full table schema (all columns).
requiredSchema HoodieSchema Yes (constructor) The projected schema containing only columns needed by the query.
mergeType String Yes (constructor) The merge strategy type (e.g., position-based or payload-based) for MOR table reading.
internalSchemaOption Option<InternalSchema> No (constructor) Internal schema for schema evolution support. Empty if schema evolution is disabled.
configuration Configuration Yes (constructor) Flink configuration for batch reader settings.

Outputs

Name Type Description
return value CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>> A closeable iterator over batched records. Each RecordsWithSplitIds contains a batch of HoodieRecordWithPosition<RowData> objects associated with the split ID. Each record carries file offset and record offset for position tracking.

Usage Examples

// HoodieSplitReaderFunction is instantiated by the Hudi source internally.
// The following illustrates the construction and invocation:

import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
import org.apache.hudi.source.split.HoodieSourceSplit;

// Create the reader function (once per reader subtask)
HoodieSplitReaderFunction readerFunction = new HoodieSplitReaderFunction(
    hoodieTable,
    readerContext,
    configuration,
    tableSchema,
    requiredSchema,
    "position_based_merge",
    internalSchemaOption
);

// Read a split (invoked by the source reader for each assigned split)
HoodieSourceSplit split = ...; // assigned by the enumerator
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>> iterator =
    readerFunction.read(split);

// Iterate over batched records
while (iterator.hasNext()) {
    RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> batch = iterator.next();
    // Each batch is associated with split.splitId()
    // Records contain RowData + position information
}
iterator.close();

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment