Implementation:Apache Hudi ClusteringPlanOperator NotifyCheckpointComplete
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for generating clustering plan events from the Hudi timeline on Flink checkpoint completion, provided by Apache Hudi.
Description
The ClusteringPlanOperator is a Flink OneInputStreamOperator that extends AbstractStreamOperator. It acts as the clustering plan generator in the online (streaming) Flink pipeline. Its primary entry point is the notifyCheckpointComplete(long checkpointId) method, which is called by Flink after each successful checkpoint.
On each checkpoint completion, the operator:
- Reloads the active timeline from the Hudi metastore.
- Queries for pending (REQUESTED) clustering instants.
- Selects the first pending instant in FIFO order.
- Reads the
HoodieClusteringPlanfrom the instant metadata. - Transitions the instant from REQUESTED to INFLIGHT on the timeline.
- Creates a
ClusteringPlanEventfor eachHoodieClusteringGroupin the plan. - Emits each event as a
StreamRecordto the downstream operator.
The companion ClusteringPlanSourceFunction serves the same purpose in the batch (offline) clustering job. It takes a pre-resolved clustering plan and instant time in its constructor and emits events in its run(SourceContext) method. It verifies the instant is still pending before emitting.
On startup, the operator rolls back any inflight clustering instants from previous failed runs via ClusteringUtil.rollbackClustering.
Usage
Use ClusteringPlanOperator in streaming Flink pipelines where clustering runs continuously alongside the write pipeline. Use ClusteringPlanSourceFunction in standalone batch clustering jobs executed via HoodieFlinkClusteringJob.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java - Lines: 59-186
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java - Lines: 49-98
Signature
// ClusteringPlanOperator
public class ClusteringPlanOperator extends AbstractStreamOperator<ClusteringPlanEvent>
implements OneInputStreamOperator<RowData, ClusteringPlanEvent> {
public ClusteringPlanOperator(Configuration conf)
@Override
public void notifyCheckpointComplete(long checkpointId)
@Override
public void processElement(StreamRecord<RowData> streamRecord)
// no operation
}
// ClusteringPlanSourceFunction
public class ClusteringPlanSourceFunction extends AbstractRichFunctionAdapter
implements SourceFunctionAdapter<ClusteringPlanEvent> {
public ClusteringPlanSourceFunction(
String clusteringInstantTime,
HoodieClusteringPlan clusteringPlan,
Configuration conf)
@Override
public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception
}
Import
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.flink.configuration.Configuration;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| checkpointId | long |
Yes (ClusteringPlanOperator) | The Flink checkpoint ID that triggered plan generation |
| clusteringInstantTime | String |
Yes (ClusteringPlanSourceFunction) | The pre-resolved clustering instant time for batch execution |
| clusteringPlan | HoodieClusteringPlan |
Yes (ClusteringPlanSourceFunction) | The pre-resolved clustering plan containing input groups and strategy parameters |
| conf | Configuration |
Yes | Flink configuration for creating the Hudi table and meta client |
Outputs
| Name | Type | Description |
|---|---|---|
| ClusteringPlanEvent | org.apache.hudi.sink.clustering.ClusteringPlanEvent |
One event per HoodieClusteringGroup in the plan, containing the instant time, ClusteringGroupInfo, strategy params, and a group index for keyed distribution
|
Usage Examples
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
// Online (streaming) mode: ClusteringPlanOperator triggers on checkpoint
Configuration conf = getHudiConfiguration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RowData> writeStream = /* the Hudi write pipeline */;
DataStream<ClusteringPlanEvent> clusteringPlanStream = writeStream
.transform("clustering_plan",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1); // must be singleton
// Batch (offline) mode: ClusteringPlanSourceFunction with pre-resolved plan
DataStream<ClusteringPlanEvent> batchPlanStream = env
.addSource(new ClusteringPlanSourceFunction(
clusteringInstantTime, clusteringPlan, conf))
.name("clustering_source")
.uid("uid_clustering_source");