Implementation:Apache Hudi HoodieContinuousSplitEnumerator HandleSplitRequest
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for coordinating split discovery and assignment to Flink reader subtasks in both bounded and unbounded read modes, provided by Apache Hudi.
Description
The Hudi Flink source uses a hierarchy of split enumerators to manage how file splits are distributed to reader tasks:
AbstractHoodieSplitEnumerator is the base class that implements the core handleSplitRequest logic. When a reader subtask requests a split, the enumerator adds it to a LinkedHashMap of awaiting readers (preserving request order), then invokes assignSplits(). The assignment loop iterates over waiting readers, queries the HoodieSplitProvider for the next available split, and either assigns it via enumeratorContext.assignSplit() or, if no split is available, either waits (continuous mode) or signals noMoreSplits (static mode). The method also handles SplitRequestEvent source events, which carry completed split IDs from readers.
HoodieContinuousSplitEnumerator extends the abstract enumerator for streaming/incremental reads. On start(), it schedules periodic split discovery via enumeratorContext.callAsync() at the configured scan interval. The discoverSplits() method delegates to a HoodieContinuousSplitDiscover to find new splits since the last enumerated position. Discovery is throttled when the number of pending splits exceeds maxPendingSplits. After discovery, the enumerator updates its position (commit instant and offset) and feeds new splits to the provider. The enumerator's state is checkpointable via snapshotState(), which captures the provider state and the current position.
HoodieStaticSplitEnumerator extends the abstract enumerator for bounded batch reads. It overrides shouldWaitForMoreSplits() to return false, causing the enumerator to signal noMoreSplits to readers once all discovered splits have been assigned.
DefaultHoodieSplitAssigner uses Flink's KeyGroupRangeAssignment to deterministically map file IDs to task IDs based on consistent hashing, ensuring stable split-to-task affinity across checkpoints.
Usage
These components are used internally by the Hudi FLIP-27 source. The HoodieContinuousSplitEnumerator is selected for streaming reads (when read.streaming.enabled = true); the HoodieStaticSplitEnumerator is selected for batch reads. Users influence behavior through connector options:
read.streaming.check-intervalcontrols the periodic discovery intervalread.streaming.skip.compaction/read.streaming.skip.clusteringfilter out specific commit types- The source parallelism (
read.tasks) determines the number of reader subtasks requesting splits
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java - Lines: 50-126
- Also:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java(Lines 47-187) - Also:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieStaticSplitEnumerator.java(Lines 30-40) - Also:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/assign/DefaultHoodieSplitAssigner.java(Lines 28-50)
Signature
// AbstractHoodieSplitEnumerator - core split request handling
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
readersAwaitingSplit.put(subtaskId, requesterHostname);
assignSplits();
}
// HoodieContinuousSplitEnumerator - periodic split discovery
@Override
public void start() {
super.start();
enumeratorContext.callAsync(
this::discoverSplits,
this::processDiscoveredSplits,
0L,
scanContext.getScanInterval().toMillis());
}
// HoodieStaticSplitEnumerator - bounded behavior
@Override
protected boolean shouldWaitForMoreSplits() {
return false;
}
// DefaultHoodieSplitAssigner - consistent hash assignment
@Override
public int assign(HoodieSourceSplit split) {
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
split.getFileId(), maxParallelism, parallelism);
}
Import
import org.apache.hudi.source.enumerator.HoodieContinuousSplitEnumerator;
import org.apache.hudi.source.enumerator.HoodieStaticSplitEnumerator;
import org.apache.hudi.source.enumerator.AbstractHoodieSplitEnumerator;
import org.apache.hudi.source.assign.DefaultHoodieSplitAssigner;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| subtaskId | int |
Yes | The ID of the reader subtask requesting a split. |
| requesterHostname | String |
No | The hostname of the requesting subtask, used for locality-aware assignment. May be null.
|
| enumeratorContext | SplitEnumeratorContext<HoodieSourceSplit> |
Yes (constructor) | Flink framework context providing access to registered readers, split assignment, and async invocation. |
| splitProvider | HoodieSplitProvider |
Yes (constructor) | Provider that holds pending splits and serves them to the enumerator on demand. |
| splitDiscover | HoodieContinuousSplitDiscover |
Yes (constructor, continuous only) | Discovery component that finds new splits since the last enumerated commit position. |
| scanContext | HoodieScanContext |
Yes (constructor, continuous only) | Read context with scan interval, max pending splits threshold, and skip flags. |
Outputs
| Name | Type | Description |
|---|---|---|
| Split assignment | HoodieSourceSplit (sent via enumeratorContext.assignSplit()) |
Individual splits assigned to reader subtasks. Each split identifies a file group (partition path, file ID, base file, log files). |
| No-more-splits signal | void (sent via enumeratorContext.signalNoMoreSplits()) |
Sent to a reader in static (batch) mode when all splits have been assigned. |
| Checkpointed state | HoodieSplitEnumeratorState |
Captures the provider state, last enumerated instant, and offset for fault-tolerant recovery. |
Usage Examples
// The enumerator is instantiated internally by HoodieSource.
// The following shows the conceptual lifecycle:
// 1. Source creates the enumerator
HoodieContinuousSplitEnumerator enumerator = new HoodieContinuousSplitEnumerator(
enumeratorContext,
splitProvider,
splitDiscover,
scanContext,
Option.empty() // no restored state
);
// 2. Flink calls start() -- schedules periodic discovery
enumerator.start();
// 3. Reader subtask 0 requests a split
enumerator.handleSplitRequest(0, "worker-host-1");
// If a split is available, it is assigned immediately via enumeratorContext.assignSplit()
// If no split is available, the reader waits until the next discovery cycle
// 4. Periodic discovery finds new splits
// discoverSplits() -> processDiscoveredSplits() -> assignSplits()
// 5. On checkpoint, state is saved
HoodieSplitEnumeratorState state = enumerator.snapshotState(checkpointId);
// state contains the last enumerated instant and pending split info