Implementation:Apache Flink SingleThreadFetcherManager
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Source_Framework |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A fetcher manager implementation that uses a single I/O thread to handle all splits concurrently through one SplitFetcher instance.
Description
SingleThreadFetcherManager extends SplitFetcherManager and provides a concrete fetcher management strategy where exactly one fetching thread handles all assigned splits. When splits are added, the manager checks if a fetcher thread is already running. If not, it creates one, adds the splits, and starts it. If a fetcher is already running, the splits are simply added to the existing fetcher. Similarly, when splits are removed, they are removed from the existing running fetcher.
This design is appropriate for two common source patterns: (1) sources where a single client instance manages multiple subscriptions concurrently (e.g., Apache Kafka, where one KafkaConsumer handles multiple topic partitions), and (2) sources where splits are read sequentially one after another (e.g., file sources, where files are read in order). The single-thread approach avoids the complexity of multi-threaded fetching while being sufficient for many connector use cases.
The class provides a getRunningFetcher() protected method that returns the single active fetcher, or null if no fetcher is running. This method can be overridden by subclasses that need custom fetcher selection logic.
Usage
Connector developers use SingleThreadFetcherManager when building source readers that require a single I/O thread for all splits. It is automatically created by SingleThreadMultiplexSourceReaderBase when a Supplier<SplitReader> is provided, but can also be instantiated directly and passed to the source reader constructor for more control. Custom subclasses may override getRunningFetcher() or use the constructor that accepts a splitFinishedHook callback for additional split lifecycle management.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
- Lines: 1-106
Signature
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {
// Constructor with default configuration
public SingleThreadFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderSupplier);
// Constructor with explicit configuration
public SingleThreadFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
Configuration configuration);
// Constructor with configuration and split-finished hook
public SingleThreadFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
Configuration configuration,
Consumer<Collection<String>> splitFinishedHook);
@Override
public void addSplits(List<SplitT> splitsToAdd);
@Override
public void removeSplits(List<SplitT> splitsToRemove);
protected SplitFetcher<E, SplitT> getRunningFetcher();
}
Import
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| splitReaderSupplier | Supplier<SplitReader<E, SplitT>> | Yes | A factory that creates SplitReader instances to connect to the external source system. |
| configuration | Configuration | No | Flink configuration for the fetcher manager. Defaults to an empty Configuration if not provided. |
| splitFinishedHook | Consumer<Collection<String>> | No | An optional callback invoked when splits are finished within the fetcher. |
| splitsToAdd | List<SplitT> | Yes (addSplits) | The list of splits to assign to the single fetcher thread. |
| splitsToRemove | List<SplitT> | Yes (removeSplits) | The list of splits to remove from the single fetcher thread. |
Outputs
| Name | Type | Description |
|---|---|---|
| (side effect) | void | Splits are added to or removed from the internal SplitFetcher. A new fetcher thread is started if none is running when splits are added. |
| getRunningFetcher() | SplitFetcher<E, SplitT> (nullable) | Returns the single running fetcher, or null if no fetcher is active. |
Usage Examples
// Example: Creating a SingleThreadFetcherManager and passing it to a source reader
SingleThreadFetcherManager<byte[], MySourceSplit> fetcherManager =
new SingleThreadFetcherManager<>(
() -> new MySplitReader(), // SplitReader supplier
config); // Flink Configuration
// Use the fetcher manager in a source reader constructor
MySourceReader reader = new MySourceReader(
fetcherManager,
new MyRecordEmitter(),
config,
context);
// Example: Using the split-finished hook variant
SingleThreadFetcherManager<byte[], MySourceSplit> fetcherManagerWithHook =
new SingleThreadFetcherManager<>(
() -> new MySplitReader(),
config,
finishedSplitIds -> {
LOG.info("Splits finished: {}", finishedSplitIds);
});