Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Data Reading And Processing

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Reading raw file data from storage splits and transforming it into in-memory records suitable for downstream processing.

Description

After a split has been assigned to a reader task, the actual data must be read from storage and transformed into the framework's internal record format. This is the data reading and processing step. It is the most I/O-intensive phase of the read pipeline and involves several sub-operations:

  1. File group composition: A single split typically represents a file group -- a base Parquet file plus zero or more log files (in merge-on-read tables). The reader must understand this composition to produce a correct, merged view of the data.
  2. Schema reconciliation: The table schema (all columns) may differ from the required schema (only the columns the query needs). The reader must project the data to only the required columns, avoiding unnecessary deserialization of unused fields.
  3. Log merging: For merge-on-read tables, the reader must apply log file records on top of the base file records. This involves reading the log files, matching records by key, and producing the merged result. The merge type (payload-based, position-based) determines the merge strategy.
  4. Format-specific reading: The actual bytes on disk are in Parquet format (for base files) or Avro format (for log files). The reader uses format-specific utilities to decode these into the framework's internal row representation (e.g., Flink RowData).
  5. Batching: Records are not emitted one at a time. Instead, they are collected into batches and wrapped in a RecordsWithSplitIds container that associates each batch with its originating split. This batching amortizes the overhead of cross-thread communication.

Usage

Use this technique whenever a source connector must read from a complex file layout with multiple file types (base + log) and support schema evolution, projection pushdown, and merge semantics. It is applicable to:

  • Copy-on-write (COW) tables where only base Parquet files exist
  • Merge-on-read (MOR) tables where base files and log files must be merged
  • Tables with schema evolution, where the reader must reconcile different schema versions
  • Queries that only need a subset of columns (projection pushdown)

Theoretical Basis

The data reading step implements a merge-read pipeline. The core abstraction is a file group reader that composes base file reading and log file application into a single iterator:

function readSplit(split):
    // Construct the file group from split metadata
    fileSlice = FileSlice(
        partitionPath = split.partitionPath,
        fileId        = split.fileId,
        baseFile      = split.basePath,
        logFiles      = split.logPaths
    )

    // Build the file group reader with schema projection
    reader = FileGroupReader.builder()
        .fileSlice(fileSlice)
        .dataSchema(tableSchema)
        .requestedSchema(requiredSchema)
        .mergeType(mergeType)
        .build()

    // Obtain an iterator over merged, projected records
    recordIterator = reader.getIterator()

    // Batch records for efficient downstream delivery
    return batchRecords(split, recordIterator)

function batchRecords(split, iterator):
    batches = []
    currentBatch = []
    while iterator.hasNext():
        record = iterator.next()
        currentBatch.add(RecordWithPosition(record, fileOffset, recordOffset))
        if currentBatch.size >= batchSize:
            batches.add(RecordsWithSplitIds(split.id, currentBatch))
            currentBatch = []
    if currentBatch is not empty:
        batches.add(RecordsWithSplitIds(split.id, currentBatch))
    return batches

The theoretical foundation rests on two properties:

  • Merge correctness: When log files are present, the merge must produce the same result as if all updates had been applied to a snapshot. This requires a deterministic merge strategy (e.g., position-based for efficient merging, or payload-based for custom merge logic).
  • Projection safety: Reading only requested columns must not affect the merge outcome. The reader must ensure that any columns needed for merging (e.g., record keys) are included even if not requested by the query, then strip them before emitting.

The complexity of this step is O(B + L log L) per file group, where B is the base file size and L is the total log file size (log L for sorting/indexing log records by position).

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment