Environment:Apache Hudi Flink Runtime Environment
| Knowledge Sources | |
|---|---|
| Domains | Infrastructure, Stream_Processing |
| Last Updated | 2026-02-08 20:00 GMT |
Overview
Apache Flink 1.17+ runtime environment with Java 11+, Hadoop 2.10+, and the Hudi Flink bundle JAR for running Hudi streaming writes, reads, compaction, and clustering.
Description
This environment defines the runtime prerequisites for operating Apache Hudi tables via the Flink SQL/DataStream API. It requires a running Flink cluster (session or application mode) with the version-matched hudi-flink-bundle JAR deployed. Hudi supports Flink 1.17 through 2.1, with 1.20 as the default. The runtime also requires Hadoop-compatible filesystem access (HDFS, S3, or local) and optionally a Hive Metastore for catalog integration.
Usage
Use this environment for any Flink-based Hudi workflow: streaming writes, batch/incremental reads, MOR compaction, table clustering, and schema evolution. It is the mandatory runtime context for all Flink Implementation pages.
System Requirements
| Category | Requirement | Notes |
|---|---|---|
| Java | Java 11 or 17 | Must match build-time Java version |
| Flink | 1.17.1 through 2.1.1 | Default is 1.20.1; bundle must match cluster version |
| Hadoop | 2.10.2+ | Required for filesystem access (HDFS, S3A) |
| Memory | Configurable per operator | See write.task.max.size (default 1GB per write task)
|
| Storage | HDFS, S3, or local filesystem | Must be accessible from all Flink TaskManagers |
Dependencies
Runtime JARs
hudi-flink{version}-bundle(version-matched to Flink cluster)- Hadoop client libraries (if not provided by Flink distribution)
- Hive Metastore client (optional, for catalog integration)
Key Configuration Properties
write.task.max.size= 1024 MB (maximum memory per write task)write.merge.max_memory= 100 MB (merge map memory)write.batch.size= 256 MB (batch size for writes)compaction.delta_commits= 5 (commits before compaction)compaction.max_memory= 100 MB (spillable map memory)metadata.enabled= true (metadata table for performance)
Credentials
The following environment variables may be required depending on deployment:
HIVE_CONF_DIR: Path to Hive configuration directory (for Hive Metastore sync)- Hadoop credentials (for S3A/HDFS access, configured via
core-site.xmlor Flink config)
Quick Install
# Download matching Flink distribution
wget https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz
tar xzf flink-1.20.1-bin-scala_2.12.tgz
# Copy Hudi bundle to Flink lib directory
cp hudi-flink1.20-bundle-*.jar flink-1.20.1/lib/
# Start Flink cluster
./flink-1.20.1/bin/start-cluster.sh
# Submit Flink SQL with Hudi
./flink-1.20.1/bin/sql-client.sh
Code Evidence
Memory validation from MemorySegmentPoolFactory.java:47-52:
long mergeReaderMem = 100; // constant 100MB
long mergeMapMaxMem = conf.get(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
long maxBufferSize = (long) ((conf.get(FlinkOptions.WRITE_TASK_MAX_SIZE)
- mergeReaderMem - mergeMapMaxMem) * 1024 * 1024);
final String errMsg = String.format(
"'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)",
FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
ValidationUtils.checkState(maxBufferSize > 0, errMsg);
Hive configuration fallback from HadoopConfigurations.java:65:
String explicitDir = conf.getString(
FlinkOptions.HIVE_SYNC_CONF_DIR.key(), System.getenv("HIVE_CONF_DIR"));
Write task max size from FlinkOptions.java:703-708:
public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
.key("write.task.max.size")
.doubleType()
.defaultValue(1024D) // 1GB
.withDescription("Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB");
Common Errors
| Error Message | Cause | Solution |
|---|---|---|
'write.task.max.size' should be at least greater than 'write.merge.max_memory' plus merge reader memory |
write.task.max.size too small |
Increase to at least 201 MB (100 reader + 100 merge + 1 buffer) |
Embedded metastore is not allowed |
Hive catalog without external metastore | Set hive.metastore.uris to a running Hive Metastore
|
path cannot be null |
Missing table path in DDL | Specify 'path' = 's3://...' in CREATE TABLE
|
| ClassNotFoundException for Hudi classes | Bundle JAR not in Flink lib path | Copy the correct hudi-flink{version}-bundle JAR to $FLINK_HOME/lib/
|
Compatibility Notes
- Flink version matching: The Hudi Flink bundle must match the Flink cluster major version. A
hudi-flink1.18-bundlewill not work on a Flink 1.20 cluster. - Flink is Scala-free: Since Flink 1.15, Scala version does not matter for Flink bundles.
- Hadoop compatibility: Hudi builds against Hadoop 2.10.2 by default but is compatible with Hadoop 3.x at runtime.
- Hive Metastore: Embedded Hive metastore is not supported for catalog mode; an external Hive Metastore service is required.
Related Pages
- Implementation:Apache_Hudi_HoodieFlinkStreamer_Main
- Implementation:Apache_Hudi_HoodieTableFactory_CreateDynamicTableSink
- Implementation:Apache_Hudi_OptionsResolver_Write_Configuration
- Implementation:Apache_Hudi_BucketAssignFunction_ProcessElement
- Implementation:Apache_Hudi_StreamWriteFunction_ProcessElement
- Implementation:Apache_Hudi_StreamerUtil_ValidateWriteStatus
- Implementation:Apache_Hudi_OptionsResolver_Query_Configuration
- Implementation:Apache_Hudi_HoodieTableSource_GetScanRuntimeProvider
- Implementation:Apache_Hudi_FileIndex_GetFilesInPartitions
- Implementation:Apache_Hudi_HoodieContinuousSplitEnumerator_HandleSplitRequest
- Implementation:Apache_Hudi_HoodieSplitReaderFunction_Read
- Implementation:Apache_Hudi_HoodieRecordEmitter_EmitRecord
- Implementation:Apache_Hudi_CompactionUtil_ScheduleCompaction
- Implementation:Apache_Hudi_CompactionPlanStrategy_Select
- Implementation:Apache_Hudi_HoodieFlinkCompactor_Main
- Implementation:Apache_Hudi_CompactOperator_ProcessElement
- Implementation:Apache_Hudi_CompactionCommitSink_Invoke
- Implementation:Apache_Hudi_ClusteringUtil_ValidateClusteringScheduling
- Implementation:Apache_Hudi_ClusteringPlanOperator_NotifyCheckpointComplete
- Implementation:Apache_Hudi_FlinkSizeBasedClusteringPlanStrategy_BuildGroups
- Implementation:Apache_Hudi_ClusteringOperator_DoClustering
- Implementation:Apache_Hudi_ClusteringCommitSink_DoCommit
- Implementation:Apache_Hudi_SchemaChangeUtils_IsTypeUpdateAllow
- Implementation:Apache_Hudi_HoodieCatalogUtil_AlterTable
- Implementation:Apache_Hudi_HoodieSchemaCompatibility_CheckSchemaCompatible
- Implementation:Apache_Hudi_SchemaEvolvingRowDataProjection_Project
- Implementation:Apache_Hudi_HoodieFileGroupReader_GetClosableIterator