Implementation:Apache Hudi HoodieFlinkStreamer Main
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for bootstrapping a Flink streaming write pipeline from command-line arguments provided by Apache Hudi.
Description
HoodieFlinkStreamer.main() is the entry point for the Hudi Flink streaming ingestion job. It parses command-line arguments via JCommander into a FlinkStreamerConfig, configures the Flink StreamExecutionEnvironment with checkpointing and state backend settings, creates a Kafka source, and assembles the full write pipeline (bucket assignment, stream write, compaction or clustering). Additionally, StreamerUtil.initTableIfNotExists() ensures the target Hudi table is initialized on the filesystem before writes begin.
The method performs:
- CLI parsing with JCommander to populate
FlinkStreamerConfig - State backend and checkpoint storage configuration
- Checkpointing enablement with the configured interval and max concurrent checkpoints set to 1
- Kafka source creation with the specified topic and consumer properties
- Optional transformation via user-supplied
Transformerclasses - Sink task parallelism inference via
OptionsInference.setupSinkTasks() - Pipeline branching: append mode (insert without compaction) vs. upsert mode (with bucket assignment and compaction)
- Table initialization through
StreamerUtil.initTableIfNotExists()
Usage
Use this entry point when you want to run a standalone Kafka-to-Hudi ingestion job from the command line. It is typically invoked via:
flink run -c org.apache.hudi.streamer.HoodieFlinkStreamer \
hudi-flink-bundle.jar \
--target-base-path /path/to/hudi_table \
--table-type MERGE_ON_READ \
--target-table my_table \
--source-avro-schema-path /path/to/schema.avsc \
--kafka-topic my_topic \
--kafka-bootstrap-servers localhost:9092 \
--kafka-group-id my_group \
--checkpoint-interval 30000
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java - Lines: 49-117
Also:
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java - Lines: 286-333
Signature
// HoodieFlinkStreamer entry point
public static void main(String[] args) throws Exception
// Table initialization
public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException
Import
import org.apache.hudi.streamer.HoodieFlinkStreamer;
import org.apache.hudi.util.StreamerUtil;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| args | String[] | Yes | Command-line arguments parsed by JCommander into FlinkStreamerConfig (table path, table type, Kafka topic, checkpoint interval, etc.) |
| cfg.stateBackend | StateBackendOptions.StateBackend | No | State backend type (default: HashMap); configured via --state-backend |
| cfg.flinkCheckPointPath | String | No | Filesystem path for checkpoint storage |
| cfg.checkpointInterval | long | Yes | Checkpoint interval in milliseconds |
| cfg.kafkaTopic | String | Yes | Kafka topic to consume from |
| cfg.transformerClassNames | List<String> | No | Fully-qualified class names of Transformer implementations to apply |
| conf (for initTableIfNotExists) | Configuration | Yes | Flink Configuration containing FlinkOptions (PATH, TABLE_TYPE, TABLE_NAME, SOURCE_AVRO_SCHEMA, etc.) |
Outputs
| Name | Type | Description |
|---|---|---|
| (side effect) | StreamExecutionEnvironment | Configured execution environment with checkpointing enabled and max concurrent checkpoints set to 1 |
| (side effect) | DataStream pipeline | Fully assembled write pipeline (source -> transform -> bucket assign -> stream write -> compact/cluster/clean) |
| return (initTableIfNotExists) | HoodieTableMetaClient | Meta client for the initialized (or existing) Hudi table |
Usage Examples
// Programmatic invocation (equivalent to CLI)
String[] args = new String[]{
"--target-base-path", "/tmp/hudi_table",
"--table-type", "MERGE_ON_READ",
"--target-table", "test_table",
"--source-avro-schema-path", "/tmp/schema.avsc",
"--kafka-topic", "test_topic",
"--kafka-bootstrap-servers", "localhost:9092",
"--kafka-group-id", "test_group",
"--checkpoint-interval", "30000"
};
HoodieFlinkStreamer.main(args);
// Direct table initialization
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.TABLE_NAME, "test_table");
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchemaString);
HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);