Implementation:Apache Flink SplitFetcher
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Source_Framework |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Internal I/O thread runnable responsible for polling records from external systems via a SplitReader and placing them into the elements queue for consumption by the source reader main thread.
Description
SplitFetcher is a core threading component in the Flink source reader framework. It implements Runnable and operates as an independent I/O thread that manages the lifecycle of reading from assigned splits. Each SplitFetcher instance is identified by a unique integer ID and runs in its own thread managed by the SplitFetcherManager.
The main loop in run() repeatedly calls runOnce(). Each iteration acquires a ReentrantLock, checks for queued tasks (add splits, remove splits, pause/resume splits) in FIFO order from a Deque<SplitFetcherTask>, or falls back to the default FetchTask if splits are assigned but no explicit tasks are queued. If the fetcher is idle (no splits and no tasks), it blocks on a Condition variable (nonEmpty) until work arrives. Tasks are executed outside the lock to allow concurrent wake-up.
Key features include:
- Task queue model: Explicit tasks (AddSplitsTask, RemoveSplitsTask, PauseOrResumeSplitsTask) are enqueued and take priority over the default fetch task, ensuring split management operations are processed promptly.
- Cooperative wake-up: The wakeUp() method signals both the condition variable for queued tasks and the running task's wakeUp() method, enabling cooperative interruption of long-running fetch operations.
- Pause/resume for watermark alignment: Supports pausing and resuming the entire fetcher when splits progress too far beyond the global watermark.
- Graceful shutdown: Uses a CountDownLatch (recordsProcessedLatch) to ensure the SplitReader is only closed after all previously emitted records are processed by the main reader thread. An empty synchronization batch is enqueued during shutdown that counts down the latch upon recycling.
- Split finish hook: Calls a splitFinishedHook consumer when splits are completed, allowing the SplitFetcherManager to track finished splits.
Usage
SplitFetcher is not typically instantiated directly by connector developers. It is created internally by SplitFetcherManager.createSplitFetcher() and started via SplitFetcherManager.startFetcher(). Connector developers interact with split fetchers indirectly through the SplitFetcherManager which manages their lifecycle. The SplitFetcher is relevant when understanding the threading model, debugging I/O issues, or implementing custom fetcher managers.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
- Lines: 1-451
Signature
@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
// Constructor (package-private)
SplitFetcher(
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Consumer<Throwable> errorHandler,
Runnable shutdownHook,
Consumer<Collection<String>> splitFinishedHook,
boolean allowUnalignedSourceSplits);
// Runnable interface
public void run();
// Split management
public void addSplits(List<SplitT> splitsToAdd);
public void removeSplits(List<SplitT> splitsToRemove);
public void pauseOrResumeSplits(
Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume);
// Task management
public void enqueueTask(SplitFetcherTask task);
// Lifecycle
public void shutdown();
public void shutdown(boolean waitingForRecordsProcessed);
public void pause();
public void resume();
// Accessors
public SplitReader<E, SplitT> getSplitReader();
public int fetcherId();
}
Import
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| id | int | Yes | Unique identifier for this fetcher, assigned by the SplitFetcherManager |
| elementsQueue | FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> | Yes | Shared bounded blocking queue where fetched record batches are placed for the main reader thread |
| splitReader | SplitReader<E, SplitT> | Yes | The actual I/O component that reads records from the external system |
| errorHandler | Consumer<Throwable> | Yes | Callback to propagate uncaught exceptions to the SplitFetcherManager |
| shutdownHook | Runnable | Yes | Callback executed when the fetcher thread exits, used to deregister the fetcher from the manager |
| splitFinishedHook | Consumer<Collection<String>> | Yes | Callback invoked when splits finish reading, used to notify the manager of completed splits |
| splitsToAdd (via addSplits) | List<SplitT> | No | Splits to be assigned to this fetcher for reading |
Outputs
| Name | Type | Description |
|---|---|---|
| elementsQueue entries | RecordsWithSplitIds<E> | Record batches placed into the shared elements queue by the FetchTask, containing records grouped by split ID |
| errorHandler invocation | Consumer<Throwable> | Uncaught exceptions from the fetcher thread are forwarded to the error handler for propagation |
| shutdownHook invocation | Runnable | Executed in the finally block when the fetcher thread exits, regardless of success or failure |
Usage Examples
Basic Usage
// SplitFetcher is typically created via SplitFetcherManager, not directly.
// Inside a custom SplitFetcherManager subclass:
public class MySplitFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {
public MySplitFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderFactory,
Configuration configuration) {
super(splitReaderFactory, configuration);
}
@Override
public void addSplits(List<SplitT> splitsToAdd) {
SplitFetcher<E, SplitT> fetcher = createSplitFetcher();
fetcher.addSplits(splitsToAdd);
startFetcher(fetcher);
}
@Override
public void removeSplits(List<SplitT> splitsToRemove) {
// Find the fetcher owning the splits and remove them
for (SplitFetcher<E, SplitT> fetcher : fetchers.values()) {
fetcher.removeSplits(splitsToRemove);
}
}
}