Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Environment:Apache Hudi Flink Runtime Environment

From Leeroopedia


Knowledge Sources
Domains Infrastructure, Stream_Processing
Last Updated 2026-02-08 20:00 GMT

Overview

Apache Flink 1.17+ runtime environment with Java 11+, Hadoop 2.10+, and the Hudi Flink bundle JAR for running Hudi streaming writes, reads, compaction, and clustering.

Description

This environment defines the runtime prerequisites for operating Apache Hudi tables via the Flink SQL/DataStream API. It requires a running Flink cluster (session or application mode) with the version-matched hudi-flink-bundle JAR deployed. Hudi supports Flink 1.17 through 2.1, with 1.20 as the default. The runtime also requires Hadoop-compatible filesystem access (HDFS, S3, or local) and optionally a Hive Metastore for catalog integration.

Usage

Use this environment for any Flink-based Hudi workflow: streaming writes, batch/incremental reads, MOR compaction, table clustering, and schema evolution. It is the mandatory runtime context for all Flink Implementation pages.

System Requirements

Category Requirement Notes
Java Java 11 or 17 Must match build-time Java version
Flink 1.17.1 through 2.1.1 Default is 1.20.1; bundle must match cluster version
Hadoop 2.10.2+ Required for filesystem access (HDFS, S3A)
Memory Configurable per operator See write.task.max.size (default 1GB per write task)
Storage HDFS, S3, or local filesystem Must be accessible from all Flink TaskManagers

Dependencies

Runtime JARs

  • hudi-flink{version}-bundle (version-matched to Flink cluster)
  • Hadoop client libraries (if not provided by Flink distribution)
  • Hive Metastore client (optional, for catalog integration)

Key Configuration Properties

  • write.task.max.size = 1024 MB (maximum memory per write task)
  • write.merge.max_memory = 100 MB (merge map memory)
  • write.batch.size = 256 MB (batch size for writes)
  • compaction.delta_commits = 5 (commits before compaction)
  • compaction.max_memory = 100 MB (spillable map memory)
  • metadata.enabled = true (metadata table for performance)

Credentials

The following environment variables may be required depending on deployment:

  • HIVE_CONF_DIR: Path to Hive configuration directory (for Hive Metastore sync)
  • Hadoop credentials (for S3A/HDFS access, configured via core-site.xml or Flink config)

Quick Install

# Download matching Flink distribution
wget https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz
tar xzf flink-1.20.1-bin-scala_2.12.tgz

# Copy Hudi bundle to Flink lib directory
cp hudi-flink1.20-bundle-*.jar flink-1.20.1/lib/

# Start Flink cluster
./flink-1.20.1/bin/start-cluster.sh

# Submit Flink SQL with Hudi
./flink-1.20.1/bin/sql-client.sh

Code Evidence

Memory validation from MemorySegmentPoolFactory.java:47-52:

long mergeReaderMem = 100; // constant 100MB
long mergeMapMaxMem = conf.get(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
long maxBufferSize = (long) ((conf.get(FlinkOptions.WRITE_TASK_MAX_SIZE)
    - mergeReaderMem - mergeMapMaxMem) * 1024 * 1024);
final String errMsg = String.format(
    "'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)",
    FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
ValidationUtils.checkState(maxBufferSize > 0, errMsg);

Hive configuration fallback from HadoopConfigurations.java:65:

String explicitDir = conf.getString(
    FlinkOptions.HIVE_SYNC_CONF_DIR.key(), System.getenv("HIVE_CONF_DIR"));

Write task max size from FlinkOptions.java:703-708:

public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
    .key("write.task.max.size")
    .doubleType()
    .defaultValue(1024D) // 1GB
    .withDescription("Maximum memory in MB for a write task, when the threshold hits,\n"
        + "it flushes the max size data bucket to avoid OOM, default 1GB");

Common Errors

Error Message Cause Solution
'write.task.max.size' should be at least greater than 'write.merge.max_memory' plus merge reader memory write.task.max.size too small Increase to at least 201 MB (100 reader + 100 merge + 1 buffer)
Embedded metastore is not allowed Hive catalog without external metastore Set hive.metastore.uris to a running Hive Metastore
path cannot be null Missing table path in DDL Specify 'path' = 's3://...' in CREATE TABLE
ClassNotFoundException for Hudi classes Bundle JAR not in Flink lib path Copy the correct hudi-flink{version}-bundle JAR to $FLINK_HOME/lib/

Compatibility Notes

  • Flink version matching: The Hudi Flink bundle must match the Flink cluster major version. A hudi-flink1.18-bundle will not work on a Flink 1.20 cluster.
  • Flink is Scala-free: Since Flink 1.15, Scala version does not matter for Flink bundles.
  • Hadoop compatibility: Hudi builds against Hadoop 2.10.2 by default but is compatible with Hadoop 3.x at runtime.
  • Hive Metastore: Embedded Hive metastore is not supported for catalog mode; an external Hive Metastore service is required.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment