Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi ClusteringPlanOperator NotifyCheckpointComplete

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Data_Layout_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for generating clustering plan events from the Hudi timeline on Flink checkpoint completion, provided by Apache Hudi.

Description

The ClusteringPlanOperator is a Flink OneInputStreamOperator that extends AbstractStreamOperator. It acts as the clustering plan generator in the online (streaming) Flink pipeline. Its primary entry point is the notifyCheckpointComplete(long checkpointId) method, which is called by Flink after each successful checkpoint.

On each checkpoint completion, the operator:

  1. Reloads the active timeline from the Hudi metastore.
  2. Queries for pending (REQUESTED) clustering instants.
  3. Selects the first pending instant in FIFO order.
  4. Reads the HoodieClusteringPlan from the instant metadata.
  5. Transitions the instant from REQUESTED to INFLIGHT on the timeline.
  6. Creates a ClusteringPlanEvent for each HoodieClusteringGroup in the plan.
  7. Emits each event as a StreamRecord to the downstream operator.

The companion ClusteringPlanSourceFunction serves the same purpose in the batch (offline) clustering job. It takes a pre-resolved clustering plan and instant time in its constructor and emits events in its run(SourceContext) method. It verifies the instant is still pending before emitting.

On startup, the operator rolls back any inflight clustering instants from previous failed runs via ClusteringUtil.rollbackClustering.

Usage

Use ClusteringPlanOperator in streaming Flink pipelines where clustering runs continuously alongside the write pipeline. Use ClusteringPlanSourceFunction in standalone batch clustering jobs executed via HoodieFlinkClusteringJob.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java
  • Lines: 59-186
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
  • Lines: 49-98

Signature

// ClusteringPlanOperator
public class ClusteringPlanOperator extends AbstractStreamOperator<ClusteringPlanEvent>
    implements OneInputStreamOperator<RowData, ClusteringPlanEvent> {

    public ClusteringPlanOperator(Configuration conf)

    @Override
    public void notifyCheckpointComplete(long checkpointId)

    @Override
    public void processElement(StreamRecord<RowData> streamRecord)
        // no operation
}

// ClusteringPlanSourceFunction
public class ClusteringPlanSourceFunction extends AbstractRichFunctionAdapter
    implements SourceFunctionAdapter<ClusteringPlanEvent> {

    public ClusteringPlanSourceFunction(
        String clusteringInstantTime,
        HoodieClusteringPlan clusteringPlan,
        Configuration conf)

    @Override
    public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception
}

Import

import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.flink.configuration.Configuration;

I/O Contract

Inputs

Name Type Required Description
checkpointId long Yes (ClusteringPlanOperator) The Flink checkpoint ID that triggered plan generation
clusteringInstantTime String Yes (ClusteringPlanSourceFunction) The pre-resolved clustering instant time for batch execution
clusteringPlan HoodieClusteringPlan Yes (ClusteringPlanSourceFunction) The pre-resolved clustering plan containing input groups and strategy parameters
conf Configuration Yes Flink configuration for creating the Hudi table and meta client

Outputs

Name Type Description
ClusteringPlanEvent org.apache.hudi.sink.clustering.ClusteringPlanEvent One event per HoodieClusteringGroup in the plan, containing the instant time, ClusteringGroupInfo, strategy params, and a group index for keyed distribution

Usage Examples

import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;

// Online (streaming) mode: ClusteringPlanOperator triggers on checkpoint
Configuration conf = getHudiConfiguration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RowData> writeStream = /* the Hudi write pipeline */;

DataStream<ClusteringPlanEvent> clusteringPlanStream = writeStream
    .transform("clustering_plan",
        TypeInformation.of(ClusteringPlanEvent.class),
        new ClusteringPlanOperator(conf))
    .setParallelism(1);  // must be singleton

// Batch (offline) mode: ClusteringPlanSourceFunction with pre-resolved plan
DataStream<ClusteringPlanEvent> batchPlanStream = env
    .addSource(new ClusteringPlanSourceFunction(
        clusteringInstantTime, clusteringPlan, conf))
    .name("clustering_source")
    .uid("uid_clustering_source");

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment