Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi SchemaEvolvingRowDataProjection Project

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Schema_Management
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for projecting Flink RowData records from a file-era schema to a query-era schema with type promotion, rename mapping, and null-filling for added columns provided by Apache Hudi.

Description

SchemaEvolvingRowDataProjection.project takes a single RowData record encoded under a file-era RowType ("from") and returns a new GenericRowData that conforms to the query-era RowType ("to"). The projection handles three categories of field transformations:

  • Existing fields with the same type -- The value is copied as-is using a FieldGetter.
  • Existing fields with a different type -- A TypeConverter is applied to cast the value (for example, from IntType to LongType). Converters are created recursively for nested ROW, ARRAY, and MAP structures.
  • New fields (added after file was written) -- A NULL_GETTER is used, producing null values for these positions.

Column renames are resolved by consulting a renamedColumns map that maps fully-qualified new names to old names. The projection looks up the old name in the "from" RowType to find the correct field index.

FlinkRowDataReaderContext.getFileRecordIterator is the method that the HoodieFileGroupReader calls to obtain an iterator of RowData records from a Parquet file. For base files, it enables the InternalSchemaManager so that per-file schema evolution is applied during reading. For log files, schema evolution is disabled at this layer because it is handled by the FileGroupRecordBuffer instead.

InternalSchemaManager.getMergeSchema produces the merged InternalSchema for a specific file by extracting the commit instant from the file name, looking up the InternalSchema version active at that commit, and merging it with the query schema. If the file schema and query schema are identical, an empty schema is returned to signal that no transformation is needed.

Usage

Use these methods when:

  • A Flink read task opens a Parquet base file written under a schema different from the current query schema.
  • A per-file schema-aware iterator is needed that transparently handles type promotions and renames.
  • An InternalSchemaManager instance is required to compute the per-file merge schema for CastMap and column name resolution.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
  • Lines: 54-56
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
  • Lines: 94-111
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
  • Lines: 115-128

Signature

public RowData project(RowData rowData)
@Override
public ClosableIterator<RowData> getFileRecordIterator(
    StoragePath filePath,
    long start,
    long length,
    HoodieSchema dataSchema,
    HoodieSchema requiredSchema,
    HoodieStorage storage) throws IOException
InternalSchema getMergeSchema(String fileName)

Import

import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
import org.apache.hudi.table.format.FlinkRowDataReaderContext;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

I/O Contract

Inputs

Name Type Required Description
from RowType Yes The file-era schema describing the structure of the incoming RowData
to RowType Yes The query-era schema describing the desired output structure
renamedColumns Map<String, String> Yes Mapping from new fully-qualified column names to old column names for rename resolution
rowData RowData Yes The input record to project (encoded under the "from" schema)
fileName String Yes File name used by InternalSchemaManager to extract the commit instant and look up the per-file schema
filePath StoragePath Yes Path to the Parquet file to read
dataSchema HoodieSchema Yes The full data schema for the file
requiredSchema HoodieSchema Yes The schema of the columns required by the query
storage HoodieStorage Yes Storage abstraction for file I/O

Outputs

Name Type Description
project result RowData A GenericRowData conforming to the "to" RowType, with evolved types, renamed fields mapped, and null values for newly added columns
getFileRecordIterator result ClosableIterator<RowData> An iterator of RowData records read from the Parquet file with per-file schema evolution applied (for base files)
getMergeSchema result InternalSchema The merged InternalSchema for the specified file; an empty schema if no merge is needed

Usage Examples

// Build a schema-evolving projection
RowType fileEraSchema = RowType.of(
    DataTypes.INT().getLogicalType(),     // "age" was INT in the file
    DataTypes.STRING().getLogicalType()   // "name"
);
RowType queryEraSchema = RowType.of(
    DataTypes.BIGINT().getLogicalType(),  // "age" promoted to BIGINT
    DataTypes.STRING().getLogicalType(),  // "full_name" (renamed)
    DataTypes.DOUBLE().getLogicalType()   // "score" (new column)
);
Map<String, String> renames = Map.of("full_name", "name");

SchemaEvolvingRowDataProjection projection =
    new SchemaEvolvingRowDataProjection(fileEraSchema, queryEraSchema, renames);

// Project each record
RowData fileRecord = ... ; // read from Parquet with file-era schema
RowData projected = projection.project(fileRecord);
// projected.getLong(0) == (long) fileRecord.getInt(0)  // type-promoted
// projected.getString(1) == fileRecord.getString(1)     // renamed
// projected.isNullAt(2) == true                         // new column is null

// Use InternalSchemaManager to resolve per-file merge schema
InternalSchemaManager schemaManager = InternalSchemaManager.get(storageConf, metaClient);
InternalSchema mergeSchema = schemaManager.getMergeSchema("20240101120000_0_0.parquet");

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment