Implementation:Apache Flink SplitFetcherManager
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Source_Framework |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Abstract base class that manages the lifecycle of SplitFetcher threads, providing the threading infrastructure for the source reader's I/O layer.
Description
SplitFetcherManager is a central infrastructure class that decouples the source reader's main thread from I/O operations. It creates and manages SplitFetcher instances, each running in its own thread, and provides a shared FutureCompletingBlockingQueue for data hand-off between fetcher threads and the SourceReaderBase main thread.
The class creates SplitFetcher instances via the synchronized createSplitFetcher() method, which uses the provided splitReaderFactory supplier to instantiate a SplitReader for each new fetcher. Fetchers are tracked in a ConcurrentHashMap keyed by auto-incrementing integer IDs, and submitted to a cached ExecutorService thread pool whose threads are named with the prefix "Source Data Fetcher for " followed by the task thread name.
Key features include:
- Extensible threading model: Subclasses implement addSplits() and removeSplits() to define how splits are distributed across fetchers. This allows different strategies such as single-thread (all splits in one fetcher), per-split (one fetcher per split), or custom grouping.
- Error propagation: Uncaught exceptions from fetcher threads are captured in an AtomicReference<Throwable> and surfaced to the main reader thread via checkErrors(), which throws a RuntimeException wrapping the original cause.
- Idle fetcher shutdown: maybeShutdownFinishedFetchers() iterates over active fetchers and shuts down any that are idle (no assigned splits, no queued tasks, no running task), ensuring resources are freed when work completes.
- Graceful close: The close(long timeoutMs) method shuts down all fetchers, actively drains the elements queue to unblock any fetcher threads that may be blocked on put(), shuts down the executor, and waits up to the configured timeout for termination.
- Pause/resume support: pauseOrResumeSplits() distributes pause and resume commands across all fetchers that own the specified splits, supporting watermark alignment.
Usage
Extend SplitFetcherManager when building a custom source connector that needs a specific threading model for split reading. The most common subclass is SingleThreadFetcherManager which assigns all splits to a single fetcher. Implement addSplits() to define how incoming splits are assigned to fetchers (create new fetchers or assign to existing ones) and removeSplits() to handle split removal. Pass the manager instance to SourceReaderBase during construction.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
- Lines: 1-317
Signature
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
// Constructors
public SplitFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderFactory,
Configuration configuration);
public SplitFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderFactory,
Configuration configuration,
Consumer<Collection<String>> splitFinishedHook);
// Abstract methods for threading model
public abstract void addSplits(List<SplitT> splitsToAdd);
public abstract void removeSplits(List<SplitT> splitsToRemove);
// Split management
public void pauseOrResumeSplits(
Collection<String> splitIdsToPause, Collection<String> splitIdsToResume);
// Fetcher lifecycle
protected synchronized SplitFetcher<E, SplitT> createSplitFetcher();
protected void startFetcher(SplitFetcher<E, SplitT> fetcher);
public boolean maybeShutdownFinishedFetchers();
// Queue access
public FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> getQueue();
// Lifecycle and error handling
public synchronized void close(long timeoutMs) throws Exception;
public void checkErrors();
// Testing
public int getNumAliveFetchers();
}
Import
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| splitReaderFactory | Supplier<SplitReader<E, SplitT>> | Yes | Factory that creates new SplitReader instances, one per SplitFetcher thread |
| configuration | Configuration | Yes | Flink configuration containing element queue capacity (SourceReaderOptions.ELEMENT_QUEUE_CAPACITY) and pipeline options (ALLOW_UNALIGNED_SOURCE_SPLITS) |
| splitFinishedHook | Consumer<Collection<String>> | No | Optional callback invoked when splits finish in a fetcher, used for testing or custom split lifecycle tracking |
| splitsToAdd (via addSplits) | List<SplitT> | Yes | Splits to distribute across fetcher threads according to the subclass threading model |
| splitsToRemove (via removeSplits) | List<SplitT> | No | Splits to remove from their assigned fetchers |
Outputs
| Name | Type | Description |
|---|---|---|
| elementsQueue | FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> | Shared bounded queue containing record batches produced by fetcher threads, consumed by the SourceReaderBase main thread |
| uncaught exceptions | RuntimeException (via checkErrors) | Propagated exceptions from fetcher threads, thrown as RuntimeException wrapping the original cause |
| maybeShutdownFinishedFetchers return | boolean | Returns true when all fetchers have completed and been shut down, indicating no more data will be produced |
Usage Examples
Basic Usage
// Implementing a single-thread fetcher manager that assigns all splits to one fetcher
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {
public SingleThreadFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderFactory,
Configuration configuration) {
super(splitReaderFactory, configuration);
}
@Override
public void addSplits(List<SplitT> splitsToAdd) {
SplitFetcher<E, SplitT> fetcher;
if (fetchers.isEmpty()) {
fetcher = createSplitFetcher();
startFetcher(fetcher);
} else {
fetcher = fetchers.values().iterator().next();
}
fetcher.addSplits(splitsToAdd);
}
@Override
public void removeSplits(List<SplitT> splitsToRemove) {
for (SplitFetcher<E, SplitT> fetcher : fetchers.values()) {
fetcher.removeSplits(splitsToRemove);
}
}
}