Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieFlinkStreamer Main

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for bootstrapping a Flink streaming write pipeline from command-line arguments provided by Apache Hudi.

Description

HoodieFlinkStreamer.main() is the entry point for the Hudi Flink streaming ingestion job. It parses command-line arguments via JCommander into a FlinkStreamerConfig, configures the Flink StreamExecutionEnvironment with checkpointing and state backend settings, creates a Kafka source, and assembles the full write pipeline (bucket assignment, stream write, compaction or clustering). Additionally, StreamerUtil.initTableIfNotExists() ensures the target Hudi table is initialized on the filesystem before writes begin.

The method performs:

  • CLI parsing with JCommander to populate FlinkStreamerConfig
  • State backend and checkpoint storage configuration
  • Checkpointing enablement with the configured interval and max concurrent checkpoints set to 1
  • Kafka source creation with the specified topic and consumer properties
  • Optional transformation via user-supplied Transformer classes
  • Sink task parallelism inference via OptionsInference.setupSinkTasks()
  • Pipeline branching: append mode (insert without compaction) vs. upsert mode (with bucket assignment and compaction)
  • Table initialization through StreamerUtil.initTableIfNotExists()

Usage

Use this entry point when you want to run a standalone Kafka-to-Hudi ingestion job from the command line. It is typically invoked via:

flink run -c org.apache.hudi.streamer.HoodieFlinkStreamer \
  hudi-flink-bundle.jar \
  --target-base-path /path/to/hudi_table \
  --table-type MERGE_ON_READ \
  --target-table my_table \
  --source-avro-schema-path /path/to/schema.avsc \
  --kafka-topic my_topic \
  --kafka-bootstrap-servers localhost:9092 \
  --kafka-group-id my_group \
  --checkpoint-interval 30000

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
  • Lines: 49-117

Also:

  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
  • Lines: 286-333

Signature

// HoodieFlinkStreamer entry point
public static void main(String[] args) throws Exception

// Table initialization
public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException

Import

import org.apache.hudi.streamer.HoodieFlinkStreamer;
import org.apache.hudi.util.StreamerUtil;

I/O Contract

Inputs

Name Type Required Description
args String[] Yes Command-line arguments parsed by JCommander into FlinkStreamerConfig (table path, table type, Kafka topic, checkpoint interval, etc.)
cfg.stateBackend StateBackendOptions.StateBackend No State backend type (default: HashMap); configured via --state-backend
cfg.flinkCheckPointPath String No Filesystem path for checkpoint storage
cfg.checkpointInterval long Yes Checkpoint interval in milliseconds
cfg.kafkaTopic String Yes Kafka topic to consume from
cfg.transformerClassNames List<String> No Fully-qualified class names of Transformer implementations to apply
conf (for initTableIfNotExists) Configuration Yes Flink Configuration containing FlinkOptions (PATH, TABLE_TYPE, TABLE_NAME, SOURCE_AVRO_SCHEMA, etc.)

Outputs

Name Type Description
(side effect) StreamExecutionEnvironment Configured execution environment with checkpointing enabled and max concurrent checkpoints set to 1
(side effect) DataStream pipeline Fully assembled write pipeline (source -> transform -> bucket assign -> stream write -> compact/cluster/clean)
return (initTableIfNotExists) HoodieTableMetaClient Meta client for the initialized (or existing) Hudi table

Usage Examples

// Programmatic invocation (equivalent to CLI)
String[] args = new String[]{
    "--target-base-path", "/tmp/hudi_table",
    "--table-type", "MERGE_ON_READ",
    "--target-table", "test_table",
    "--source-avro-schema-path", "/tmp/schema.avsc",
    "--kafka-topic", "test_topic",
    "--kafka-bootstrap-servers", "localhost:9092",
    "--kafka-group-id", "test_group",
    "--checkpoint-interval", "30000"
};
HoodieFlinkStreamer.main(args);

// Direct table initialization
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.TABLE_NAME, "test_table");
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchemaString);
HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment