Implementation:Apache Hudi HoodieTableFactory CreateDynamicTableSink
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for creating a Hudi dynamic table sink from Flink SQL DDL or Table API definitions provided by Apache Hudi.
Description
HoodieTableFactory.createDynamicTableSink() is the Flink DynamicTableSinkFactory implementation that converts a CatalogTable context into a fully configured HoodieTableSink. It performs the following steps:
- Extracts connector options from the DDL into a Flink
Configurationobject. - Validates the table path is non-empty.
- Reconciles DDL options with any existing table properties on the filesystem via
setupTableOptions(). - Extracts the
ResolvedSchemafrom the catalog table. - Performs sanity checks: validates table type, index type, record key fields, and ordering fields.
- Sets up configuration options: table name, database name, hoodie key options (primary key / partition key mapping), compaction options, hive sync options, read/write options, and sort options.
- Infers the Avro schema from the DDL's physical row type if not explicitly provided.
- Returns a new
HoodieTableSink(conf, schema).
The factory identifier is "hudi", matching the 'connector' = 'hudi' DDL option.
Usage
This method is invoked automatically by Flink's table planner when a user creates or references a Hudi table in SQL DDL or the Table API. It is not called directly by user code.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java - Lines: 98-109
Signature
@Override
public DynamicTableSink createDynamicTableSink(Context context)
Import
import org.apache.hudi.table.HoodieTableFactory;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| context | DynamicTableSinkFactory.Context | Yes | Flink table factory context containing CatalogTable, ResolvedSchema, and ObjectIdentifier |
| context.getCatalogTable().getOptions() | Map<String, String> | Yes | DDL connector options including 'path', 'table.type', 'hoodie.datasource.write.operation', etc. |
| context.getCatalogTable().getResolvedSchema() | ResolvedSchema | Yes | Physical schema with column names, data types, primary key, and partition keys |
| context.getObjectIdentifier() | ObjectIdentifier | Yes | Database name and table name from the DDL |
| context.getConfiguration() | ReadableConfig | No | Flink session/table environment configuration for sort options |
Outputs
| Name | Type | Description |
|---|---|---|
| return | HoodieTableSink | Configured Hudi table sink containing the validated Configuration and ResolvedSchema, with inferred Avro schema |
Usage Examples
// Flink SQL DDL (invokes createDynamicTableSink automatically)
// In Flink SQL client or TableEnvironment:
tableEnv.executeSql(
"CREATE TABLE my_hudi_table (\n"
+ " id BIGINT PRIMARY KEY NOT ENFORCED,\n"
+ " name STRING,\n"
+ " ts TIMESTAMP(3),\n"
+ " dt STRING\n"
+ ") PARTITIONED BY (dt) WITH (\n"
+ " 'connector' = 'hudi',\n"
+ " 'path' = '/tmp/hudi/my_hudi_table',\n"
+ " 'table.type' = 'MERGE_ON_READ',\n"
+ " 'hoodie.datasource.write.operation' = 'upsert'\n"
+ ")"
);
// Insert triggers the sink pipeline
tableEnv.executeSql(
"INSERT INTO my_hudi_table SELECT * FROM source_table"
);
Related Pages
Implements Principle
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment