Implementation:Apache Hudi SchemaEvolvingRowDataProjection Project
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Schema_Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for projecting Flink RowData records from a file-era schema to a query-era schema with type promotion, rename mapping, and null-filling for added columns provided by Apache Hudi.
Description
SchemaEvolvingRowDataProjection.project takes a single RowData record encoded under a file-era RowType ("from") and returns a new GenericRowData that conforms to the query-era RowType ("to"). The projection handles three categories of field transformations:
- Existing fields with the same type -- The value is copied as-is using a FieldGetter.
- Existing fields with a different type -- A TypeConverter is applied to cast the value (for example, from IntType to LongType). Converters are created recursively for nested ROW, ARRAY, and MAP structures.
- New fields (added after file was written) -- A NULL_GETTER is used, producing null values for these positions.
Column renames are resolved by consulting a renamedColumns map that maps fully-qualified new names to old names. The projection looks up the old name in the "from" RowType to find the correct field index.
FlinkRowDataReaderContext.getFileRecordIterator is the method that the HoodieFileGroupReader calls to obtain an iterator of RowData records from a Parquet file. For base files, it enables the InternalSchemaManager so that per-file schema evolution is applied during reading. For log files, schema evolution is disabled at this layer because it is handled by the FileGroupRecordBuffer instead.
InternalSchemaManager.getMergeSchema produces the merged InternalSchema for a specific file by extracting the commit instant from the file name, looking up the InternalSchema version active at that commit, and merging it with the query schema. If the file schema and query schema are identical, an empty schema is returned to signal that no transformation is needed.
Usage
Use these methods when:
- A Flink read task opens a Parquet base file written under a schema different from the current query schema.
- A per-file schema-aware iterator is needed that transparently handles type promotions and renames.
- An InternalSchemaManager instance is required to compute the per-file merge schema for CastMap and column name resolution.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java - Lines: 54-56
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java - Lines: 94-111
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java - Lines: 115-128
Signature
public RowData project(RowData rowData)
@Override
public ClosableIterator<RowData> getFileRecordIterator(
StoragePath filePath,
long start,
long length,
HoodieSchema dataSchema,
HoodieSchema requiredSchema,
HoodieStorage storage) throws IOException
InternalSchema getMergeSchema(String fileName)
Import
import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
import org.apache.hudi.table.format.FlinkRowDataReaderContext;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| from | RowType |
Yes | The file-era schema describing the structure of the incoming RowData |
| to | RowType |
Yes | The query-era schema describing the desired output structure |
| renamedColumns | Map<String, String> |
Yes | Mapping from new fully-qualified column names to old column names for rename resolution |
| rowData | RowData |
Yes | The input record to project (encoded under the "from" schema) |
| fileName | String |
Yes | File name used by InternalSchemaManager to extract the commit instant and look up the per-file schema |
| filePath | StoragePath |
Yes | Path to the Parquet file to read |
| dataSchema | HoodieSchema |
Yes | The full data schema for the file |
| requiredSchema | HoodieSchema |
Yes | The schema of the columns required by the query |
| storage | HoodieStorage |
Yes | Storage abstraction for file I/O |
Outputs
| Name | Type | Description |
|---|---|---|
| project result | RowData |
A GenericRowData conforming to the "to" RowType, with evolved types, renamed fields mapped, and null values for newly added columns |
| getFileRecordIterator result | ClosableIterator<RowData> |
An iterator of RowData records read from the Parquet file with per-file schema evolution applied (for base files) |
| getMergeSchema result | InternalSchema |
The merged InternalSchema for the specified file; an empty schema if no merge is needed |
Usage Examples
// Build a schema-evolving projection
RowType fileEraSchema = RowType.of(
DataTypes.INT().getLogicalType(), // "age" was INT in the file
DataTypes.STRING().getLogicalType() // "name"
);
RowType queryEraSchema = RowType.of(
DataTypes.BIGINT().getLogicalType(), // "age" promoted to BIGINT
DataTypes.STRING().getLogicalType(), // "full_name" (renamed)
DataTypes.DOUBLE().getLogicalType() // "score" (new column)
);
Map<String, String> renames = Map.of("full_name", "name");
SchemaEvolvingRowDataProjection projection =
new SchemaEvolvingRowDataProjection(fileEraSchema, queryEraSchema, renames);
// Project each record
RowData fileRecord = ... ; // read from Parquet with file-era schema
RowData projected = projection.project(fileRecord);
// projected.getLong(0) == (long) fileRecord.getInt(0) // type-promoted
// projected.getString(1) == fileRecord.getString(1) // renamed
// projected.isNullAt(2) == true // new column is null
// Use InternalSchemaManager to resolve per-file merge schema
InternalSchemaManager schemaManager = InternalSchemaManager.get(storageConf, metaClient);
InternalSchema mergeSchema = schemaManager.getMergeSchema("20240101120000_0_0.parquet");