Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieTableFactory CreateDynamicTableSink

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for creating a Hudi dynamic table sink from Flink SQL DDL or Table API definitions provided by Apache Hudi.

Description

HoodieTableFactory.createDynamicTableSink() is the Flink DynamicTableSinkFactory implementation that converts a CatalogTable context into a fully configured HoodieTableSink. It performs the following steps:

  1. Extracts connector options from the DDL into a Flink Configuration object.
  2. Validates the table path is non-empty.
  3. Reconciles DDL options with any existing table properties on the filesystem via setupTableOptions().
  4. Extracts the ResolvedSchema from the catalog table.
  5. Performs sanity checks: validates table type, index type, record key fields, and ordering fields.
  6. Sets up configuration options: table name, database name, hoodie key options (primary key / partition key mapping), compaction options, hive sync options, read/write options, and sort options.
  7. Infers the Avro schema from the DDL's physical row type if not explicitly provided.
  8. Returns a new HoodieTableSink(conf, schema).

The factory identifier is "hudi", matching the 'connector' = 'hudi' DDL option.

Usage

This method is invoked automatically by Flink's table planner when a user creates or references a Hudi table in SQL DDL or the Table API. It is not called directly by user code.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
  • Lines: 98-109

Signature

@Override
public DynamicTableSink createDynamicTableSink(Context context)

Import

import org.apache.hudi.table.HoodieTableFactory;

I/O Contract

Inputs

Name Type Required Description
context DynamicTableSinkFactory.Context Yes Flink table factory context containing CatalogTable, ResolvedSchema, and ObjectIdentifier
context.getCatalogTable().getOptions() Map<String, String> Yes DDL connector options including 'path', 'table.type', 'hoodie.datasource.write.operation', etc.
context.getCatalogTable().getResolvedSchema() ResolvedSchema Yes Physical schema with column names, data types, primary key, and partition keys
context.getObjectIdentifier() ObjectIdentifier Yes Database name and table name from the DDL
context.getConfiguration() ReadableConfig No Flink session/table environment configuration for sort options

Outputs

Name Type Description
return HoodieTableSink Configured Hudi table sink containing the validated Configuration and ResolvedSchema, with inferred Avro schema

Usage Examples

// Flink SQL DDL (invokes createDynamicTableSink automatically)
// In Flink SQL client or TableEnvironment:
tableEnv.executeSql(
    "CREATE TABLE my_hudi_table (\n"
    + "  id BIGINT PRIMARY KEY NOT ENFORCED,\n"
    + "  name STRING,\n"
    + "  ts TIMESTAMP(3),\n"
    + "  dt STRING\n"
    + ") PARTITIONED BY (dt) WITH (\n"
    + "  'connector' = 'hudi',\n"
    + "  'path' = '/tmp/hudi/my_hudi_table',\n"
    + "  'table.type' = 'MERGE_ON_READ',\n"
    + "  'hoodie.datasource.write.operation' = 'upsert'\n"
    + ")"
);

// Insert triggers the sink pipeline
tableEnv.executeSql(
    "INSERT INTO my_hudi_table SELECT * FROM source_table"
);

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment