Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink SplitFetcherManager

From Leeroopedia


Knowledge Sources
Domains Connectors, Source_Framework
Last Updated 2026-02-09 00:00 GMT

Overview

Abstract base class that manages the lifecycle of SplitFetcher threads, providing the threading infrastructure for the source reader's I/O layer.

Description

SplitFetcherManager is a central infrastructure class that decouples the source reader's main thread from I/O operations. It creates and manages SplitFetcher instances, each running in its own thread, and provides a shared FutureCompletingBlockingQueue for data hand-off between fetcher threads and the SourceReaderBase main thread.

The class creates SplitFetcher instances via the synchronized createSplitFetcher() method, which uses the provided splitReaderFactory supplier to instantiate a SplitReader for each new fetcher. Fetchers are tracked in a ConcurrentHashMap keyed by auto-incrementing integer IDs, and submitted to a cached ExecutorService thread pool whose threads are named with the prefix "Source Data Fetcher for " followed by the task thread name.

Key features include:

  • Extensible threading model: Subclasses implement addSplits() and removeSplits() to define how splits are distributed across fetchers. This allows different strategies such as single-thread (all splits in one fetcher), per-split (one fetcher per split), or custom grouping.
  • Error propagation: Uncaught exceptions from fetcher threads are captured in an AtomicReference<Throwable> and surfaced to the main reader thread via checkErrors(), which throws a RuntimeException wrapping the original cause.
  • Idle fetcher shutdown: maybeShutdownFinishedFetchers() iterates over active fetchers and shuts down any that are idle (no assigned splits, no queued tasks, no running task), ensuring resources are freed when work completes.
  • Graceful close: The close(long timeoutMs) method shuts down all fetchers, actively drains the elements queue to unblock any fetcher threads that may be blocked on put(), shuts down the executor, and waits up to the configured timeout for termination.
  • Pause/resume support: pauseOrResumeSplits() distributes pause and resume commands across all fetchers that own the specified splits, supporting watermark alignment.

Usage

Extend SplitFetcherManager when building a custom source connector that needs a specific threading model for split reading. The most common subclass is SingleThreadFetcherManager which assigns all splits to a single fetcher. Implement addSplits() to define how incoming splits are assigned to fetchers (create new fetchers or assign to existing ones) and removeSplits() to handle split removal. Pass the manager instance to SourceReaderBase during construction.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
  • Lines: 1-317

Signature

@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {

    // Constructors
    public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration);

    public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook);

    // Abstract methods for threading model
    public abstract void addSplits(List<SplitT> splitsToAdd);
    public abstract void removeSplits(List<SplitT> splitsToRemove);

    // Split management
    public void pauseOrResumeSplits(
            Collection<String> splitIdsToPause, Collection<String> splitIdsToResume);

    // Fetcher lifecycle
    protected synchronized SplitFetcher<E, SplitT> createSplitFetcher();
    protected void startFetcher(SplitFetcher<E, SplitT> fetcher);
    public boolean maybeShutdownFinishedFetchers();

    // Queue access
    public FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> getQueue();

    // Lifecycle and error handling
    public synchronized void close(long timeoutMs) throws Exception;
    public void checkErrors();

    // Testing
    public int getNumAliveFetchers();
}

Import

import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;

I/O Contract

Inputs

Name Type Required Description
splitReaderFactory Supplier<SplitReader<E, SplitT>> Yes Factory that creates new SplitReader instances, one per SplitFetcher thread
configuration Configuration Yes Flink configuration containing element queue capacity (SourceReaderOptions.ELEMENT_QUEUE_CAPACITY) and pipeline options (ALLOW_UNALIGNED_SOURCE_SPLITS)
splitFinishedHook Consumer<Collection<String>> No Optional callback invoked when splits finish in a fetcher, used for testing or custom split lifecycle tracking
splitsToAdd (via addSplits) List<SplitT> Yes Splits to distribute across fetcher threads according to the subclass threading model
splitsToRemove (via removeSplits) List<SplitT> No Splits to remove from their assigned fetchers

Outputs

Name Type Description
elementsQueue FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> Shared bounded queue containing record batches produced by fetcher threads, consumed by the SourceReaderBase main thread
uncaught exceptions RuntimeException (via checkErrors) Propagated exceptions from fetcher threads, thrown as RuntimeException wrapping the original cause
maybeShutdownFinishedFetchers return boolean Returns true when all fetchers have completed and been shut down, indicating no more data will be produced

Usage Examples

Basic Usage

// Implementing a single-thread fetcher manager that assigns all splits to one fetcher
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {

    public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        super(splitReaderFactory, configuration);
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        SplitFetcher<E, SplitT> fetcher;
        if (fetchers.isEmpty()) {
            fetcher = createSplitFetcher();
            startFetcher(fetcher);
        } else {
            fetcher = fetchers.values().iterator().next();
        }
        fetcher.addSplits(splitsToAdd);
    }

    @Override
    public void removeSplits(List<SplitT> splitsToRemove) {
        for (SplitFetcher<E, SplitT> fetcher : fetchers.values()) {
            fetcher.removeSplits(splitsToRemove);
        }
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment