Principle:Apache Hudi Clustering Plan Generation
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Data_Layout_Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Generating a concrete clustering execution plan by reading pending clustering instants from the timeline and decomposing them into per-group execution events for downstream operators.
Description
Once clustering has been validated and a clustering plan has been scheduled onto the Hudi timeline as a REQUESTED instant, the next step is to read that plan and convert it into actionable work units. This is the plan generation phase, which bridges the gap between a timeline-level scheduling decision and the actual parallel execution of clustering tasks.
A clustering plan (represented by HoodieClusteringPlan) contains one or more input groups (HoodieClusteringGroup), each describing a set of file slices that should be rewritten together. The plan generation phase performs the following steps:
- Discover pending plans: Query the Hudi active timeline for REQUESTED clustering instants. The first pending instant (FIFO order) is selected for execution.
- Read the plan: Deserialize the
HoodieClusteringPlanfrom the timeline instant metadata. - Transition to inflight: Mark the instant as INFLIGHT on the timeline to prevent concurrent execution.
- Decompose into events: For each
HoodieClusteringGroupin the plan, create aClusteringPlanEventcontaining the group info, strategy parameters, and instant time. - Emit events downstream: Send each
ClusteringPlanEventto the next operator in the Flink pipeline for parallel execution.
This decomposition enables Flink's parallelism model: each clustering group can be processed independently by a different task slot.
Usage
This principle applies in two distinct execution modes:
- Online (streaming) mode: The
ClusteringPlanOperatortriggers plan generation on every completed Flink checkpoint. It acts as a singleton operator that checks the timeline for pending plans and emits events. - Batch (offline) mode: The
ClusteringPlanSourceFunctionis used as a Flink source that reads a pre-scheduled clustering plan and emits events. This is used byHoodieFlinkClusteringJob.
In both modes, the operator must be a single-parallelism task to avoid race conditions on timeline transitions.
Theoretical Basis
The plan generation step implements a work decomposition pattern common in distributed data processing systems. A centralized coordinator (the plan operator) reads a global state (the timeline) and partitions work into independent units that can be executed in parallel.
FUNCTION generateClusteringEvents(timeline):
pendingInstants = timeline.getRequestedClusteringInstants()
IF pendingInstants.isEmpty():
RETURN // nothing to do
instant = pendingInstants.first() // FIFO ordering
plan = timeline.getClusteringPlan(instant)
IF plan IS NULL OR plan.inputGroups IS EMPTY:
RETURN // empty plan
timeline.transitionToInflight(instant)
FOR EACH group IN plan.inputGroups:
groupInfo = ClusteringGroupInfo.create(group)
event = new ClusteringPlanEvent(instant.time, groupInfo, plan.strategyParams)
EMIT event downstream
The transition to INFLIGHT state before emitting events provides exactly-once scheduling semantics: if the operator fails after transition but before emitting all events, the inflight instant is rolled back on restart (via ClusteringUtil.rollbackClustering), allowing the plan to be re-read and re-emitted.
The FIFO ordering of pending instants ensures that older plans are executed before newer ones, which is important for maintaining timeline consistency.