Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi OptionsResolver Write Configuration

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for resolving write pipeline topology and inferring operator parallelism provided by Apache Hudi.

Description

OptionsResolver and OptionsInference work together to analyze a Flink Configuration and produce the boolean flags and inferred settings that determine the shape of the write pipeline.

OptionsResolver provides resolution methods:

  • isAppendMode() determines whether the pipeline should use the simplified append path (no index lookup, no compaction) based on whether the operation is INSERT and the table type supports it.
  • needsAsyncCompaction(), needsAsyncClustering(), and related methods determine which downstream table service operators are needed.
  • isBucketIndexType(), isRecordLevelIndex() determine index routing.

OptionsInference provides parallelism inference:

  • setupSinkTasks() cascades parallelism defaults: write tasks default to the environment parallelism, and bucket assign, compaction, clustering, and index write tasks default to write tasks.
  • setupClientId() assigns a unique client ID for multi-writer optimistic concurrency control.

Usage

These methods are called during pipeline construction, after environment setup but before operator instantiation. They are used by both HoodieFlinkStreamer and HoodieTableSink.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
  • Lines: 83-87

Also:

  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
  • Lines: 62-84

Signature

// OptionsResolver - pipeline mode detection
public static boolean isAppendMode(Configuration conf)

// OptionsInference - parallelism inference
public static void setupSinkTasks(Configuration conf, int envTasks)

Import

import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.configuration.OptionsInference;

I/O Contract

Inputs

Name Type Required Description
conf Configuration Yes Flink Configuration containing FlinkOptions such as TABLE_TYPE, OPERATION, INDEX_TYPE, INSERT_CLUSTER, COMPACTION_ASYNC_ENABLED, etc.
envTasks int Yes (for setupSinkTasks) The default parallelism of the Flink execution environment

Outputs

Name Type Description
return (isAppendMode) boolean True if the pipeline should use the append path (INSERT on MOR, or INSERT on COW without inline clustering)
(side effect of setupSinkTasks) Configuration (mutated) The input Configuration with WRITE_TASKS, BUCKET_ASSIGN_TASKS, COMPACTION_TASKS, CLUSTERING_TASKS, and INDEX_WRITE_TASKS set to inferred values if not already specified

Usage Examples

Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.OPERATION, "insert");

// Infer parallelism based on environment
int envParallelism = env.getParallelism(); // e.g., 4
OptionsInference.setupSinkTasks(conf, envParallelism);
// Now conf has WRITE_TASKS=4, BUCKET_ASSIGN_TASKS=4, COMPACTION_TASKS=4, etc.

// Check pipeline mode
if (OptionsResolver.isAppendMode(conf)) {
    // INSERT on MOR -> append mode, no bucket assignment needed
    pipeline = Pipelines.append(conf, rowType, dataStream);
} else {
    // upsert mode with full bucket assignment
    DataStream<HoodieFlinkInternalRow> hoodieStream =
        Pipelines.bootstrap(conf, rowType, dataStream);
    pipeline = Pipelines.hoodieStreamWrite(conf, rowType, hoodieStream);
}

// Check if compaction is needed
if (OptionsResolver.needsAsyncCompaction(conf)) {
    Pipelines.compact(conf, pipeline);
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment