Principle:Apache Hudi Split Enumeration And Assignment
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Distributing discovered file splits across parallel reader tasks in a coordinated, fault-tolerant manner.
Description
Once the relevant files have been discovered and pruned, the system must decide which reader task reads which file. This is the split enumeration and assignment problem. It is handled by a dedicated coordinator component (the split enumerator) that runs as a single instance and communicates with multiple reader subtasks.
The split enumeration and assignment principle addresses several concerns:
- Centralized coordination: A single enumerator holds the global view of all pending splits and all registered readers. This avoids the coordination overhead of distributed split negotiation and ensures that each split is assigned exactly once.
- Demand-driven assignment: Readers request splits when they are ready (pull model), rather than having splits pushed to them. This naturally balances load: fast readers consume more splits, slow readers consume fewer.
- Bounded vs. unbounded behavior: For batch reads, the enumerator discovers all splits upfront and signals "no more splits" once all have been assigned. For streaming reads, the enumerator periodically discovers new splits from the timeline and never signals completion.
- Fault tolerance: When a reader fails, its assigned splits are returned to the enumerator for reassignment. The enumerator's state (including the last enumerated commit position) is checkpointed so that recovery resumes from the correct position.
- Split affinity: The assignment strategy may use hashing (e.g., by file ID) to consistently assign related splits to the same reader, improving cache locality and reducing redundant I/O.
Usage
Use split enumeration and assignment whenever a read pipeline has multiple parallel reader tasks that must coordinate over a shared set of input files. It is the standard pattern for Flink FLIP-27 sources and applies to:
- Batch incremental reads where the split set is known at startup
- Streaming reads where new splits are discovered periodically
- Any scenario requiring exactly-once split processing with checkpoint-based recovery
Theoretical Basis
Split enumeration and assignment follows the coordinator-worker pattern. The enumerator is the coordinator; the reader subtasks are the workers. Communication is asynchronous and event-driven:
// Enumerator (single instance, runs in JobManager)
state:
pendingSplits: Queue<Split> // splits waiting to be assigned
awaitingReaders: Map<TaskId, Host> // readers waiting for a split
position: CommitOffset // last enumerated commit (for streaming)
function start():
if mode == CONTINUOUS:
schedulePeriodicDiscovery(interval)
function handleSplitRequest(taskId, hostname):
awaitingReaders.put(taskId, hostname)
assignSplits()
function assignSplits():
for each (taskId, hostname) in awaitingReaders:
split = pendingSplits.poll(taskId, hostname)
if split is available:
sendSplit(taskId, split)
awaitingReaders.remove(taskId)
else if mode == STATIC:
signalNoMoreSplits(taskId)
awaitingReaders.remove(taskId)
else: // CONTINUOUS
registerCallbackForNewSplits()
break // wait for new splits
function onPeriodicDiscovery():
if pendingSplits.size > threshold:
return // throttle to avoid memory pressure
newSplits = discoverSplits(position)
pendingSplits.addAll(newSplits)
position = newSplits.lastCommit
assignSplits() // try to satisfy waiting readers
function addSplitsBack(splits, failedTaskId):
pendingSplits.addAll(splits)
assignSplits() // reassign to other readers
The key theoretical properties are:
- Exactly-once assignment: Each split is assigned to at most one reader at any time. Failed splits are returned and reassigned.
- Progress guarantee: In continuous mode, the enumerator periodically discovers new splits, ensuring that readers do not starve.
- Backpressure: The enumerator throttles discovery when the pending split queue exceeds a configurable threshold, preventing out-of-memory conditions.
- Consistent hashing (optional): The split assigner may use key-group-based hashing to deterministically map file IDs to task IDs, providing stable assignment across checkpoints and restarts.