Implementation:Apache Hudi HoodieCatalogUtil AlterTable
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Schema_Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for applying Flink SQL ALTER TABLE DDL statements to a Hudi table by merging InternalSchema versions and committing the result to the timeline provided by Apache Hudi.
Description
HoodieCatalogUtil.alterTable is the entry point that the Hudi Flink catalog calls when a user issues an ALTER TABLE statement. It receives the list of Flink table changes (such as ADD_COLUMN, DROP_COLUMN, MODIFY_COLUMN, RENAME_COLUMN), retrieves the current InternalSchema from the latest commit metadata via the write client, applies the changes using a type-conversion function that bridges Flink LogicalType to Hudi Type, and -- if the schema actually changed -- commits the new InternalSchema to the Hudi timeline with the operation type set to ALTER_SCHEMA.
InternalSchemaMerger.mergeSchema is the complementary method that produces a read-time schema by merging a file-era InternalSchema with a query-era InternalSchema. It walks the query schema's record tree, resolving each field against the file schema. When a field exists in the file schema, the merger can optionally preserve the file-era type and name (for Parquet readers that require the original encoding type). When a field is new (present in query but absent in file), it is included with a null default.
Usage
Use these methods when:
- The Flink catalog receives an ALTER TABLE DDL and must persist the schema change.
- A read path needs to produce a merged schema that reconciles file-era and query-era column types.
- A compaction job must create a unified schema from multiple file-era schemas.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java - Lines: 216-244
- File:
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java - Lines: 84-87
Signature
protected static void alterTable(
AbstractCatalog catalog,
ObjectPath tablePath,
CatalogBaseTable newTable,
List tableChanges,
boolean ignoreIfNotExists,
org.apache.hadoop.conf.Configuration hadoopConf,
BiFunction<ObjectPath, CatalogBaseTable, String> inferTablePathFunc,
BiConsumer<ObjectPath, CatalogBaseTable> postAlterTableFunc)
throws TableNotExistException, CatalogException
public InternalSchema mergeSchema()
Import
import org.apache.hudi.table.catalog.HoodieCatalogUtil;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.logical.LogicalType;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| catalog | AbstractCatalog |
Yes | The Hudi catalog instance used to look up the existing table |
| tablePath | ObjectPath |
Yes | Database and table name identifying the target table |
| newTable | CatalogBaseTable |
Yes | The new table definition after the ALTER |
| tableChanges | List |
Yes | List of Flink table changes (ADD_COLUMN, MODIFY_COLUMN, DROP_COLUMN, RENAME_COLUMN) |
| ignoreIfNotExists | boolean |
Yes | If true, silently return when the table does not exist; if false, throw TableNotExistException |
| hadoopConf | org.apache.hadoop.conf.Configuration |
Yes | Hadoop configuration for storage access |
| inferTablePathFunc | BiFunction<ObjectPath, CatalogBaseTable, String> |
Yes | Function to infer the physical Hudi table path from the catalog path |
| postAlterTableFunc | BiConsumer<ObjectPath, CatalogBaseTable> |
Yes | Callback invoked after the schema change is committed (e.g., to update Hive metastore) |
Outputs
| Name | Type | Description |
|---|---|---|
| (void) | void |
The method commits the new InternalSchema to the Hudi timeline as a side effect. No value is returned. |
| mergeSchema result | InternalSchema |
A merged InternalSchema that reconciles file-era and query-era field types and names |
Usage Examples
// Example: applying an ALTER TABLE from a Flink catalog implementation
HoodieCatalogUtil.alterTable(
this, // AbstractCatalog
new ObjectPath("default", "orders"), // tablePath
newCatalogTable, // CatalogBaseTable with updated schema
tableChanges, // e.g., [AddColumn("tax", DOUBLE)]
false, // do not ignore if not exists
hadoopConf, // Hadoop configuration
this::inferTablePath, // function to resolve physical path
this::postAlterTable // e.g., sync to Hive metastore
);
// Example: merging file-era and query-era schemas for read
InternalSchema fileSchema = ... ; // schema from file's commit instant
InternalSchema querySchema = ... ; // current table schema
InternalSchemaMerger merger = new InternalSchemaMerger(
fileSchema, querySchema,
true, // ignoreRequiredAttribute
true // useColumnTypeFromFileSchema
);
InternalSchema readSchema = merger.mergeSchema();