Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink SplitFetcher

From Leeroopedia


Knowledge Sources
Domains Connectors, Source_Framework
Last Updated 2026-02-09 00:00 GMT

Overview

Internal I/O thread runnable responsible for polling records from external systems via a SplitReader and placing them into the elements queue for consumption by the source reader main thread.

Description

SplitFetcher is a core threading component in the Flink source reader framework. It implements Runnable and operates as an independent I/O thread that manages the lifecycle of reading from assigned splits. Each SplitFetcher instance is identified by a unique integer ID and runs in its own thread managed by the SplitFetcherManager.

The main loop in run() repeatedly calls runOnce(). Each iteration acquires a ReentrantLock, checks for queued tasks (add splits, remove splits, pause/resume splits) in FIFO order from a Deque<SplitFetcherTask>, or falls back to the default FetchTask if splits are assigned but no explicit tasks are queued. If the fetcher is idle (no splits and no tasks), it blocks on a Condition variable (nonEmpty) until work arrives. Tasks are executed outside the lock to allow concurrent wake-up.

Key features include:

  • Task queue model: Explicit tasks (AddSplitsTask, RemoveSplitsTask, PauseOrResumeSplitsTask) are enqueued and take priority over the default fetch task, ensuring split management operations are processed promptly.
  • Cooperative wake-up: The wakeUp() method signals both the condition variable for queued tasks and the running task's wakeUp() method, enabling cooperative interruption of long-running fetch operations.
  • Pause/resume for watermark alignment: Supports pausing and resuming the entire fetcher when splits progress too far beyond the global watermark.
  • Graceful shutdown: Uses a CountDownLatch (recordsProcessedLatch) to ensure the SplitReader is only closed after all previously emitted records are processed by the main reader thread. An empty synchronization batch is enqueued during shutdown that counts down the latch upon recycling.
  • Split finish hook: Calls a splitFinishedHook consumer when splits are completed, allowing the SplitFetcherManager to track finished splits.

Usage

SplitFetcher is not typically instantiated directly by connector developers. It is created internally by SplitFetcherManager.createSplitFetcher() and started via SplitFetcherManager.startFetcher(). Connector developers interact with split fetchers indirectly through the SplitFetcherManager which manages their lifecycle. The SplitFetcher is relevant when understanding the threading model, debugging I/O issues, or implementing custom fetcher managers.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
  • Lines: 1-451

Signature

@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {

    // Constructor (package-private)
    SplitFetcher(
            int id,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitReader<E, SplitT> splitReader,
            Consumer<Throwable> errorHandler,
            Runnable shutdownHook,
            Consumer<Collection<String>> splitFinishedHook,
            boolean allowUnalignedSourceSplits);

    // Runnable interface
    public void run();

    // Split management
    public void addSplits(List<SplitT> splitsToAdd);
    public void removeSplits(List<SplitT> splitsToRemove);
    public void pauseOrResumeSplits(
            Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume);

    // Task management
    public void enqueueTask(SplitFetcherTask task);

    // Lifecycle
    public void shutdown();
    public void shutdown(boolean waitingForRecordsProcessed);
    public void pause();
    public void resume();

    // Accessors
    public SplitReader<E, SplitT> getSplitReader();
    public int fetcherId();
}

Import

import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;

I/O Contract

Inputs

Name Type Required Description
id int Yes Unique identifier for this fetcher, assigned by the SplitFetcherManager
elementsQueue FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> Yes Shared bounded blocking queue where fetched record batches are placed for the main reader thread
splitReader SplitReader<E, SplitT> Yes The actual I/O component that reads records from the external system
errorHandler Consumer<Throwable> Yes Callback to propagate uncaught exceptions to the SplitFetcherManager
shutdownHook Runnable Yes Callback executed when the fetcher thread exits, used to deregister the fetcher from the manager
splitFinishedHook Consumer<Collection<String>> Yes Callback invoked when splits finish reading, used to notify the manager of completed splits
splitsToAdd (via addSplits) List<SplitT> No Splits to be assigned to this fetcher for reading

Outputs

Name Type Description
elementsQueue entries RecordsWithSplitIds<E> Record batches placed into the shared elements queue by the FetchTask, containing records grouped by split ID
errorHandler invocation Consumer<Throwable> Uncaught exceptions from the fetcher thread are forwarded to the error handler for propagation
shutdownHook invocation Runnable Executed in the finally block when the fetcher thread exits, regardless of success or failure

Usage Examples

Basic Usage

// SplitFetcher is typically created via SplitFetcherManager, not directly.
// Inside a custom SplitFetcherManager subclass:
public class MySplitFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {

    public MySplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        super(splitReaderFactory, configuration);
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        SplitFetcher<E, SplitT> fetcher = createSplitFetcher();
        fetcher.addSplits(splitsToAdd);
        startFetcher(fetcher);
    }

    @Override
    public void removeSplits(List<SplitT> splitsToRemove) {
        // Find the fetcher owning the splits and remove them
        for (SplitFetcher<E, SplitT> fetcher : fetchers.values()) {
            fetcher.removeSplits(splitsToRemove);
        }
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment