Implementation:Apache Hudi OptionsResolver Write Configuration
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for resolving write pipeline topology and inferring operator parallelism provided by Apache Hudi.
Description
OptionsResolver and OptionsInference work together to analyze a Flink Configuration and produce the boolean flags and inferred settings that determine the shape of the write pipeline.
OptionsResolver provides resolution methods:
isAppendMode()determines whether the pipeline should use the simplified append path (no index lookup, no compaction) based on whether the operation is INSERT and the table type supports it.needsAsyncCompaction(),needsAsyncClustering(), and related methods determine which downstream table service operators are needed.isBucketIndexType(),isRecordLevelIndex()determine index routing.
OptionsInference provides parallelism inference:
setupSinkTasks()cascades parallelism defaults: write tasks default to the environment parallelism, and bucket assign, compaction, clustering, and index write tasks default to write tasks.setupClientId()assigns a unique client ID for multi-writer optimistic concurrency control.
Usage
These methods are called during pipeline construction, after environment setup but before operator instantiation. They are used by both HoodieFlinkStreamer and HoodieTableSink.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java - Lines: 83-87
Also:
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java - Lines: 62-84
Signature
// OptionsResolver - pipeline mode detection
public static boolean isAppendMode(Configuration conf)
// OptionsInference - parallelism inference
public static void setupSinkTasks(Configuration conf, int envTasks)
Import
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.configuration.OptionsInference;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| conf | Configuration | Yes | Flink Configuration containing FlinkOptions such as TABLE_TYPE, OPERATION, INDEX_TYPE, INSERT_CLUSTER, COMPACTION_ASYNC_ENABLED, etc. |
| envTasks | int | Yes (for setupSinkTasks) | The default parallelism of the Flink execution environment |
Outputs
| Name | Type | Description |
|---|---|---|
| return (isAppendMode) | boolean | True if the pipeline should use the append path (INSERT on MOR, or INSERT on COW without inline clustering) |
| (side effect of setupSinkTasks) | Configuration (mutated) | The input Configuration with WRITE_TASKS, BUCKET_ASSIGN_TASKS, COMPACTION_TASKS, CLUSTERING_TASKS, and INDEX_WRITE_TASKS set to inferred values if not already specified |
Usage Examples
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.OPERATION, "insert");
// Infer parallelism based on environment
int envParallelism = env.getParallelism(); // e.g., 4
OptionsInference.setupSinkTasks(conf, envParallelism);
// Now conf has WRITE_TASKS=4, BUCKET_ASSIGN_TASKS=4, COMPACTION_TASKS=4, etc.
// Check pipeline mode
if (OptionsResolver.isAppendMode(conf)) {
// INSERT on MOR -> append mode, no bucket assignment needed
pipeline = Pipelines.append(conf, rowType, dataStream);
} else {
// upsert mode with full bucket assignment
DataStream<HoodieFlinkInternalRow> hoodieStream =
Pipelines.bootstrap(conf, rowType, dataStream);
pipeline = Pipelines.hoodieStreamWrite(conf, rowType, hoodieStream);
}
// Check if compaction is needed
if (OptionsResolver.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
}
Related Pages
Implements Principle
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment