Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi End To End Schema Validation

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Schema_Management
Last Updated 2026-02-08 00:00 GMT

Overview

The file group reader orchestrates the entire schema evolution pipeline end-to-end, resolving per-file schemas, merging base and log records, and producing a single iterator of fully evolved records.

Description

End-to-End Schema Validation is the top-level coordination principle that ties together all preceding schema evolution stages -- change planning, execution, compatibility verification, and read reconciliation -- into a single, coherent read path. In Apache Hudi, the HoodieFileGroupReader is the component responsible for this coordination.

A Hudi table is organized into file groups, each of which may contain a base file (Parquet) and zero or more log files (Avro). The base file and each log file may have been written under different schema versions. The file group reader must:

  1. Determine the per-file read schema -- For each file (base or log) in the file slice, resolve the InternalSchema that was active at the time the file was committed. This is done by the FileGroupReaderSchemaHandler, which looks up the commit instant, retrieves the corresponding InternalSchema from the cache, and merges it with the requested query schema to produce a per-file read schema and a rename map.
  2. Read records with the per-file schema -- Delegate to the engine-specific reader context (FlinkRowDataReaderContext in Flink) to open the file and decode records using the per-file schema. The reader context may apply schema evolution during Parquet decoding (for base files) or defer it to the record buffer (for log files).
  3. Merge base and log records -- For merge-on-read tables, base file records must be merged with log file records using a record merger. Both streams must be in the same schema for the merge to succeed, so the schema handler ensures that both the base file reader and the log record buffer use compatible schemas.
  4. Apply output projection -- After merging, if the required schema (which may include extra metadata columns needed for merging) differs from the requested schema, a final output converter strips the extra columns.

The end result is a ClosableIterator of engine-specific records (RowData in Flink) where every record conforms to the requested query schema, regardless of which schema era each source file was written under.

Usage

Apply End-to-End Schema Validation whenever:

  • A Flink read task processes a file group that spans multiple schema versions.
  • A merge-on-read query must reconcile base and log records written under different schemas.
  • A compaction job opens file groups to produce new base files under the current schema.
  • A time-travel or incremental query reads file slices whose constituent files were written at different schema eras.
  • The Flink reader context factory creates a reader context for table services (compaction, clustering).

Theoretical Basis

The file group reader implements a multi-version schema reconciliation pipeline. The theoretical model is:

FileGroup = BaseFile(schema_v_i) + [LogFile(schema_v_j), LogFile(schema_v_k), ...]
QuerySchema = schema_v_current (or a projection of it)

For each file F in the file group:

function getRequiredSchemaForFile(path):
    if internalSchema is empty:
        return (requiredSchema, emptyRenameMap)
    commitTime = extractCommitTime(path.name)
    fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient)
    (mergedSchema, renamedCols) = InternalSchemaMerger(
        fileSchema, internalSchema,
        ignoreRequired=true,
        useFileTypes=false,     // for the general reader, use query types
        useFileNames=false
    ).mergeSchemaGetRenamed()
    mergedHoodieSchema = InternalSchemaConverter.convert(mergedSchema, ...)
    return (mergedHoodieSchema, renamedCols)

The top-level read orchestration:

function getClosableIterator(fileSlice):
    // 1. Initialize record iterators for base file and log files
    initRecordIterators()

    // 2. For each base file record, the reader context applies
    //    per-file schema evolution (type promotion, rename, null-fill)

    // 3. Log records are buffered and merged with base records
    //    using the record merger

    // 4. Output converter projects from requiredSchema to requestedSchema
    return CloseableMappingIterator(bufferedRecordIterator, record -> record.getRecord())

The FlinkReaderContextFactory bridges the Flink engine to this generic reader by constructing a FlinkRowDataReaderContext with a lazily-initialized InternalSchemaManager. The lazy initialization is important because the InternalSchemaManager may not be needed for all file types (for example, log file schema evolution bypasses it).

Key invariants:

  • Every record emitted by getClosableIterator() conforms to the requested query schema.
  • Per-file schema resolution is cached (via InternalSchemaCache) to avoid redundant timeline lookups.
  • The rename map is propagated from the schema handler to the reader context so that column lookups in Parquet files use the file-era column names.
  • Null values are filled for columns that were added after a file was written.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment