Principle:Apache Hudi Data Reading And Processing
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Reading raw file data from storage splits and transforming it into in-memory records suitable for downstream processing.
Description
After a split has been assigned to a reader task, the actual data must be read from storage and transformed into the framework's internal record format. This is the data reading and processing step. It is the most I/O-intensive phase of the read pipeline and involves several sub-operations:
- File group composition: A single split typically represents a file group -- a base Parquet file plus zero or more log files (in merge-on-read tables). The reader must understand this composition to produce a correct, merged view of the data.
- Schema reconciliation: The table schema (all columns) may differ from the required schema (only the columns the query needs). The reader must project the data to only the required columns, avoiding unnecessary deserialization of unused fields.
- Log merging: For merge-on-read tables, the reader must apply log file records on top of the base file records. This involves reading the log files, matching records by key, and producing the merged result. The merge type (payload-based, position-based) determines the merge strategy.
- Format-specific reading: The actual bytes on disk are in Parquet format (for base files) or Avro format (for log files). The reader uses format-specific utilities to decode these into the framework's internal row representation (e.g., Flink
RowData). - Batching: Records are not emitted one at a time. Instead, they are collected into batches and wrapped in a
RecordsWithSplitIdscontainer that associates each batch with its originating split. This batching amortizes the overhead of cross-thread communication.
Usage
Use this technique whenever a source connector must read from a complex file layout with multiple file types (base + log) and support schema evolution, projection pushdown, and merge semantics. It is applicable to:
- Copy-on-write (COW) tables where only base Parquet files exist
- Merge-on-read (MOR) tables where base files and log files must be merged
- Tables with schema evolution, where the reader must reconcile different schema versions
- Queries that only need a subset of columns (projection pushdown)
Theoretical Basis
The data reading step implements a merge-read pipeline. The core abstraction is a file group reader that composes base file reading and log file application into a single iterator:
function readSplit(split):
// Construct the file group from split metadata
fileSlice = FileSlice(
partitionPath = split.partitionPath,
fileId = split.fileId,
baseFile = split.basePath,
logFiles = split.logPaths
)
// Build the file group reader with schema projection
reader = FileGroupReader.builder()
.fileSlice(fileSlice)
.dataSchema(tableSchema)
.requestedSchema(requiredSchema)
.mergeType(mergeType)
.build()
// Obtain an iterator over merged, projected records
recordIterator = reader.getIterator()
// Batch records for efficient downstream delivery
return batchRecords(split, recordIterator)
function batchRecords(split, iterator):
batches = []
currentBatch = []
while iterator.hasNext():
record = iterator.next()
currentBatch.add(RecordWithPosition(record, fileOffset, recordOffset))
if currentBatch.size >= batchSize:
batches.add(RecordsWithSplitIds(split.id, currentBatch))
currentBatch = []
if currentBatch is not empty:
batches.add(RecordsWithSplitIds(split.id, currentBatch))
return batches
The theoretical foundation rests on two properties:
- Merge correctness: When log files are present, the merge must produce the same result as if all updates had been applied to a snapshot. This requires a deterministic merge strategy (e.g., position-based for efficient merging, or payload-based for custom merge logic).
- Projection safety: Reading only requested columns must not affect the merge outcome. The reader must ensure that any columns needed for merging (e.g., record keys) are included even if not requested by the query, then strip them before emitting.
The complexity of this step is O(B + L log L) per file group, where B is the base file size and L is the total log file size (log L for sorting/indexing log records by position).