Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieContinuousSplitEnumerator HandleSplitRequest

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for coordinating split discovery and assignment to Flink reader subtasks in both bounded and unbounded read modes, provided by Apache Hudi.

Description

The Hudi Flink source uses a hierarchy of split enumerators to manage how file splits are distributed to reader tasks:

AbstractHoodieSplitEnumerator is the base class that implements the core handleSplitRequest logic. When a reader subtask requests a split, the enumerator adds it to a LinkedHashMap of awaiting readers (preserving request order), then invokes assignSplits(). The assignment loop iterates over waiting readers, queries the HoodieSplitProvider for the next available split, and either assigns it via enumeratorContext.assignSplit() or, if no split is available, either waits (continuous mode) or signals noMoreSplits (static mode). The method also handles SplitRequestEvent source events, which carry completed split IDs from readers.

HoodieContinuousSplitEnumerator extends the abstract enumerator for streaming/incremental reads. On start(), it schedules periodic split discovery via enumeratorContext.callAsync() at the configured scan interval. The discoverSplits() method delegates to a HoodieContinuousSplitDiscover to find new splits since the last enumerated position. Discovery is throttled when the number of pending splits exceeds maxPendingSplits. After discovery, the enumerator updates its position (commit instant and offset) and feeds new splits to the provider. The enumerator's state is checkpointable via snapshotState(), which captures the provider state and the current position.

HoodieStaticSplitEnumerator extends the abstract enumerator for bounded batch reads. It overrides shouldWaitForMoreSplits() to return false, causing the enumerator to signal noMoreSplits to readers once all discovered splits have been assigned.

DefaultHoodieSplitAssigner uses Flink's KeyGroupRangeAssignment to deterministically map file IDs to task IDs based on consistent hashing, ensuring stable split-to-task affinity across checkpoints.

Usage

These components are used internally by the Hudi FLIP-27 source. The HoodieContinuousSplitEnumerator is selected for streaming reads (when read.streaming.enabled = true); the HoodieStaticSplitEnumerator is selected for batch reads. Users influence behavior through connector options:

  • read.streaming.check-interval controls the periodic discovery interval
  • read.streaming.skip.compaction / read.streaming.skip.clustering filter out specific commit types
  • The source parallelism (read.tasks) determines the number of reader subtasks requesting splits

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
  • Lines: 50-126
  • Also: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java (Lines 47-187)
  • Also: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieStaticSplitEnumerator.java (Lines 30-40)
  • Also: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/assign/DefaultHoodieSplitAssigner.java (Lines 28-50)

Signature

// AbstractHoodieSplitEnumerator - core split request handling
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
    readersAwaitingSplit.put(subtaskId, requesterHostname);
    assignSplits();
}

// HoodieContinuousSplitEnumerator - periodic split discovery
@Override
public void start() {
    super.start();
    enumeratorContext.callAsync(
        this::discoverSplits,
        this::processDiscoveredSplits,
        0L,
        scanContext.getScanInterval().toMillis());
}

// HoodieStaticSplitEnumerator - bounded behavior
@Override
protected boolean shouldWaitForMoreSplits() {
    return false;
}

// DefaultHoodieSplitAssigner - consistent hash assignment
@Override
public int assign(HoodieSourceSplit split) {
    return KeyGroupRangeAssignment.assignKeyToParallelOperator(
        split.getFileId(), maxParallelism, parallelism);
}

Import

import org.apache.hudi.source.enumerator.HoodieContinuousSplitEnumerator;
import org.apache.hudi.source.enumerator.HoodieStaticSplitEnumerator;
import org.apache.hudi.source.enumerator.AbstractHoodieSplitEnumerator;
import org.apache.hudi.source.assign.DefaultHoodieSplitAssigner;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;

I/O Contract

Inputs

Name Type Required Description
subtaskId int Yes The ID of the reader subtask requesting a split.
requesterHostname String No The hostname of the requesting subtask, used for locality-aware assignment. May be null.
enumeratorContext SplitEnumeratorContext<HoodieSourceSplit> Yes (constructor) Flink framework context providing access to registered readers, split assignment, and async invocation.
splitProvider HoodieSplitProvider Yes (constructor) Provider that holds pending splits and serves them to the enumerator on demand.
splitDiscover HoodieContinuousSplitDiscover Yes (constructor, continuous only) Discovery component that finds new splits since the last enumerated commit position.
scanContext HoodieScanContext Yes (constructor, continuous only) Read context with scan interval, max pending splits threshold, and skip flags.

Outputs

Name Type Description
Split assignment HoodieSourceSplit (sent via enumeratorContext.assignSplit()) Individual splits assigned to reader subtasks. Each split identifies a file group (partition path, file ID, base file, log files).
No-more-splits signal void (sent via enumeratorContext.signalNoMoreSplits()) Sent to a reader in static (batch) mode when all splits have been assigned.
Checkpointed state HoodieSplitEnumeratorState Captures the provider state, last enumerated instant, and offset for fault-tolerant recovery.

Usage Examples

// The enumerator is instantiated internally by HoodieSource.
// The following shows the conceptual lifecycle:

// 1. Source creates the enumerator
HoodieContinuousSplitEnumerator enumerator = new HoodieContinuousSplitEnumerator(
    enumeratorContext,
    splitProvider,
    splitDiscover,
    scanContext,
    Option.empty()  // no restored state
);

// 2. Flink calls start() -- schedules periodic discovery
enumerator.start();

// 3. Reader subtask 0 requests a split
enumerator.handleSplitRequest(0, "worker-host-1");
// If a split is available, it is assigned immediately via enumeratorContext.assignSplit()
// If no split is available, the reader waits until the next discovery cycle

// 4. Periodic discovery finds new splits
// discoverSplits() -> processDiscoveredSplits() -> assignSplits()

// 5. On checkpoint, state is saved
HoodieSplitEnumeratorState state = enumerator.snapshotState(checkpointId);
// state contains the last enumerated instant and pending split info

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment