Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Schema Change Execution

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Schema_Management
Last Updated 2026-02-08 00:00 GMT

Overview

Once a schema change has been validated, the system must atomically apply it by merging the old and new internal schemas and committing the result to the table's timeline.

Description

Schema Change Execution is the phase in which a validated schema modification -- adding a column, dropping a column, renaming a column, or changing a column's type -- is materialized as a new InternalSchema version and persisted to the Hudi commit timeline. In a Flink-based deployment, this phase is triggered by a Flink SQL ALTER TABLE DDL statement that flows through the Hudi catalog.

The execution phase has three responsibilities:

  1. Translate the Flink SQL table changes (ADD_COLUMN, DROP_COLUMN, MODIFY_COLUMN, RENAME_COLUMN) into InternalSchema operations by converting Flink LogicalType instances into Hudi Type instances.
  2. Merge the old InternalSchema (retrieved from the latest commit metadata) with the applied changes to produce a new InternalSchema. The merger walks the old record structure, applies additions, deletions, renames, and type promotions, and emits a new record type.
  3. Commit the new InternalSchema to the Hudi timeline via the write client, so that all subsequent readers and writers see the updated schema. The write operation type is set to ALTER_SCHEMA, ensuring the commit is identifiable as a DDL event.

This phase is strictly separated from the planning/validation phase. It assumes that the proposed changes have already been validated (for example, by the type lattice in SchemaChangeUtils). The execution is idempotent in the sense that if the old and new schemas are equal after merging, no commit is produced.

Usage

Apply Schema Change Execution whenever:

  • A Flink SQL user issues ALTER TABLE ... ADD COLUMNS, ALTER TABLE ... DROP COLUMN, ALTER TABLE ... RENAME COLUMN, or ALTER TABLE ... MODIFY COLUMN.
  • A catalog hook intercepts a DDL event and must propagate it to Hudi storage.
  • A programmatic API needs to evolve a table's schema outside of the query engine.

Theoretical Basis

Schema Change Execution follows the write-ahead metadata pattern common in transactional data lakes. The key insight is that schema changes are committed as metadata to a timeline (an ordered log of commit events) rather than rewriting existing data files. This means:

  1. Existing Parquet files retain their original schema.
  2. The new schema is recorded as metadata in the commit's extra metadata map.
  3. Readers reconcile the per-file schema against the query schema at read time.

Pseudocode for the execution flow:

function alterTable(catalog, tablePath, newTable, tableChanges, hadoopConf):
    if tableChanges is empty:
        return  // nothing to do

    oldTable = catalog.getTable(tablePath)
    writeClient = createWriteClient(tablePath, oldTable, hadoopConf)
    (oldSchema, metaClient) = writeClient.getInternalSchemaAndMetaClient()

    // Convert Flink LogicalTypes to Hudi Types and apply changes
    convertFunc = logicalType -> InternalSchemaConverter.convertToField(
        HoodieSchemaConverter.convertToSchema(logicalType))
    newSchema = applyTableChange(oldSchema, tableChanges, convertFunc)

    if oldSchema != newSchema:
        writeClient.setOperationType(ALTER_SCHEMA)
        writeClient.commitTableChange(newSchema, metaClient)

Pseudocode for InternalSchemaMerger.mergeSchema:

function mergeSchema(fileSchema, querySchema):
    mergedRecord = mergeType(querySchema.record, rootId=0)
    return InternalSchema(mergedRecord)

function mergeType(type, currentTypeId):
    if type is RECORD:
        for each field in type.fields:
            newType = mergeType(field.type, field.id)
        return RecordType(buildRecordType(fields, newTypes))
    if type is ARRAY:
        newElementType = mergeType(array.elementType, array.elementId)
        return ArrayType(newElementType)
    if type is MAP:
        newValueType = mergeType(map.valueType, map.valueId)
        return MapType(map.keyType, newValueType)
    // primitive: use file schema type if useColumnTypeFromFileSchema
    return resolvedType

The merger supports flags that control whether to use the file schema's column types and names during the merge. When reading base Parquet files, the file-era types must be preserved so the Parquet decoder receives the correct type; when reading log files, the query-era types can be passed directly because the log reader supports rewriting on the fly.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment