Principle:Apache Hudi Read Reconciliation
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Schema_Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
When reading records written under an older schema, the system must project, cast, and rename fields on the fly to present them in the shape expected by the current query schema.
Description
Read Reconciliation is the runtime phase in which records read from storage are transformed to match the query schema. Because schema changes in Apache Hudi are metadata-only operations (existing Parquet files are never rewritten), every file may carry data encoded under a different schema version than the one the query expects. The read reconciliation layer bridges this gap.
Three transformations are applied per record:
- Column projection -- The query schema may request a subset of the file's columns, or the file may lack columns that were added after it was written. For missing columns, the reconciliation layer fills in null values. For extra columns, it drops them.
- Type promotion -- If a column was widened (for example, INT to LONG) after the file was written, the record's value must be cast from the file-era type to the query-era type. The set of allowed casts mirrors the type lattice defined by SchemaChangeUtils.
- Column rename mapping -- If a column was renamed after the file was written, the reconciliation layer locates the data under the old column name in the file and maps it to the new column name in the output record.
In Flink, this reconciliation is performed by a SchemaEvolvingRowDataProjection, which is a RowData-to-RowData function. It is constructed from a "from" RowType (the file-era schema), a "to" RowType (the query-era schema), and a rename map. Internally, it builds a tree of TypeConverter functions that mirror the nested structure of the schema, handling ROW, ARRAY, and MAP types recursively.
The per-file schema is resolved by the InternalSchemaManager, which extracts the commit instant from the file name, looks up the InternalSchema that was active at that instant, and merges it with the current query schema to produce a merge schema that preserves the file-era types for Parquet decoding.
Usage
Apply Read Reconciliation whenever:
- A Flink read task opens a Parquet file whose commit instant predates the latest schema change.
- A merge-on-read query must reconcile base file records with log file records that may be under different schemas.
- A compaction or clustering job reads files from multiple schema eras and must produce output under a single unified schema.
- A time-travel query reads files from a past snapshot whose schema differs from the current table schema.
Theoretical Basis
Read Reconciliation implements a schema-on-read strategy. The core idea is that storage files are immutable and their embedded schemas are never modified; instead, a reconciliation layer at read time adapts each record to the reader's expected shape.
Pseudocode for row projection:
function createRowProjection(fromRowType, toRowType, renamedColumns):
fieldGetters = array[toRowType.fieldCount]
converters = array[toRowType.fieldCount]
for i in 0..toRowType.fieldCount:
field = toRowType.fields[i]
// Resolve rename: look up old name if field was renamed
fieldName = renamedColumns.getOrDefault(fullName(field), field.name)
indexInFrom = fromRowType.indexOf(fieldName)
if indexInFrom == -1:
// New column: fill with null
fieldGetters[i] = NULL_GETTER
converters[i] = NOOP
else:
fieldGetters[i] = createFieldGetter(fromRowType.typeAt(indexInFrom), indexInFrom)
converters[i] = createProjection(fromType, toType, renamedColumns)
return rowData -> {
result = new GenericRowData(toRowType.fieldCount)
result.setRowKind(rowData.getRowKind())
for i in 0..fieldGetters.length:
val = fieldGetters[i].get(rowData)
result[i] = val == null ? null : converters[i].convert(val)
return result
}
Pseudocode for per-file schema resolution:
function getMergeSchema(fileName):
if querySchema is empty:
return querySchema
commitTime = extractCommitTime(fileName)
fileSchema = InternalSchemaCache.getByVersionId(commitTime, tablePath, ...)
if querySchema == fileSchema:
return EMPTY_SCHEMA // no merge needed
return InternalSchemaMerger(fileSchema, querySchema,
ignoreRequired=true, useFileTypes=true).mergeSchema()
The key insight is that the merge schema uses the file-era types for Parquet decoding (because Parquet requires the exact type that was used during encoding) but the query-era structure (field order, field presence). After decoding, a second pass (the projection) casts each field from the file-era type to the query-era type.