Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi ClusteringUtil ValidateClusteringScheduling

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Layout_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for validating that a Hudi table's configuration is compatible with clustering and determining whether the clustering pipeline should be scheduled, provided by Apache Hudi.

Description

The ClusteringUtil.validateClusteringScheduling method is the primary entry point for pre-flight configuration validation in the Flink clustering workflow. It inspects the table's index type and operation mode to ensure that clustering can proceed without violating structural invariants.

The validation logic handles three cases:

  1. Non-append mode with simple bucket index: Throws HoodieNotSupportedException because simple bucket indexing relies on a fixed hash-to-file mapping that clustering would break.
  2. Non-append mode with consistent hashing bucket index: Requires the clustering plan strategy class to be set to FlinkConsistentBucketClusteringPlanStrategy. Throws HoodieNotSupportedException if a different strategy class is configured.
  3. All other cases: Validation passes (no exception thrown).

The companion methods OptionsResolver.needsScheduleClustering and OptionsResolver.needsAsyncClustering provide boolean decisions for whether clustering should be wired into the Flink job graph. needsScheduleClustering checks whether scheduling is enabled and the operation type is compatible (INSERT for non-bucket tables, UPSERT for consistent hashing bucket tables). needsAsyncClustering checks whether the operation is INSERT and async clustering is enabled.

Usage

Call validateClusteringScheduling before scheduling any clustering plan or constructing a clustering pipeline. Call the OptionsResolver methods during pipeline construction to decide whether to include clustering operators in the job graph.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
  • Lines: 45-62
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
  • Lines: 303-326

Signature

// ClusteringUtil
public static void validateClusteringScheduling(Configuration conf)

// OptionsResolver
public static boolean needsScheduleClustering(Configuration conf)

public static boolean needsAsyncClustering(Configuration conf)

Import

import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.flink.configuration.Configuration;

I/O Contract

Inputs

Name Type Required Description
conf org.apache.flink.configuration.Configuration Yes Flink configuration containing table type, operation type, index type, clustering flags, and plan strategy class settings

Outputs

Name Type Description
(void) void validateClusteringScheduling returns nothing on success; throws HoodieNotSupportedException if the configuration is incompatible with clustering
needsScheduleClustering boolean true if the clustering plan should be scheduled (clustering.schedule.enabled is true and the operation type is compatible with the index type)
needsAsyncClustering boolean true if the operation is INSERT and clustering.async.enabled is true

Usage Examples

import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.flink.configuration.Configuration;

Configuration conf = new Configuration();
conf.setString("hoodie.datasource.write.operation", "insert");
conf.setBoolean("clustering.schedule.enabled", true);
conf.setBoolean("clustering.async.enabled", true);

// Validate that clustering is structurally valid for this config
ClusteringUtil.validateClusteringScheduling(conf);

// Check if clustering should be scheduled in the pipeline
if (OptionsResolver.needsScheduleClustering(conf)) {
    // Wire clustering plan operator into the Flink job graph
}

// Check if async clustering should be enabled
if (OptionsResolver.needsAsyncClustering(conf)) {
    // Wire async clustering operator into the Flink job graph
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment