Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink FutureCompletingBlockingQueue

From Leeroopedia


Knowledge Sources
Domains Connectors, Source_Framework
Last Updated 2026-02-09 00:00 GMT

Overview

A custom bounded blocking queue that integrates with CompletableFuture for asynchronous consumer notifications, used as the hand-off mechanism between fetcher threads (producers) and the source reader main thread (consumer).

Description

FutureCompletingBlockingQueue is a critical synchronization primitive in the Flink source reader framework. It bridges the gap between the async mailbox-based main thread model and the blocking I/O fetcher threads, enabling efficient backpressure and availability notification without busy-waiting.

The queue wraps an ArrayDeque with a ReentrantLock for thread safety and maintains a CompletableFuture (currentFuture) that tracks consumer availability notifications. The future completes when the queue transitions from empty to non-empty, or when notifyAvailable() is explicitly called. The future resets to an incomplete state when poll() empties the queue or finds it already empty.

Key features include:

  • Asynchronous consumer notifications: Instead of blocking consumers via take(), the queue exposes getAvailabilityFuture() which returns a CompletableFuture that completes when data is available. Consumers can subscribe to completion handlers for non-blocking notification. The model tolerates false positives (spurious completions) similar to Java monitor spurious wakeups.
  • Producer backpressure: The put(int, T) method blocks the calling thread when the queue reaches capacity. Each producer thread has its own Condition variable stored in a dynamically-sized ConditionAndFlag array, indexed by thread/fetcher ID. This avoids unnecessary wake-ups across producer threads.
  • Graceful producer wake-up: wakeUpPuttingThread(int) sets a per-thread wake-up flag and signals the corresponding condition, causing the blocked put() to return false without adding the element. This enables cooperative shutdown without thread interruption.
  • Availability constant: A static AVAILABLE future constant (obtained reflectively from AvailabilityProvider.AVAILABLE in the runtime module) provides efficient short-circuiting when data is known to be available, avoiding volatile memory operations.
  • Capacity management: The queue is bounded with a configurable capacity (default from SourceReaderOptions.ELEMENT_QUEUE_CAPACITY). The remainingCapacity() method reports available space.

Usage

FutureCompletingBlockingQueue is typically not instantiated directly by connector developers. It is created internally by SplitFetcherManager and shared between the SplitFetcher threads (which call put()) and the SourceReaderBase main thread (which calls poll() and getAvailabilityFuture()). Understanding this class is important when debugging threading issues, backpressure behavior, or availability notification problems in the source reader framework.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
  • Lines: 1-438

Signature

@Internal
public class FutureCompletingBlockingQueue<T> {

    // Static constant for availability
    public static final CompletableFuture<Void> AVAILABLE;

    // Constructors
    public FutureCompletingBlockingQueue();
    public FutureCompletingBlockingQueue(int capacity);

    // Future / Notification logic
    public CompletableFuture<Void> getAvailabilityFuture();
    public void notifyAvailable();

    // Queue operations
    public boolean put(int threadIndex, T element) throws InterruptedException;
    public T take() throws InterruptedException;
    public T poll();
    public T peek();

    // Capacity and status
    public int size();
    public boolean isEmpty();
    public int remainingCapacity();

    // Producer thread management
    public void wakeUpPuttingThread(int threadIndex);
}

Import

import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

I/O Contract

Inputs

Name Type Required Description
capacity int No Maximum number of elements the queue can hold; defaults to SourceReaderOptions.ELEMENT_QUEUE_CAPACITY default value (2). Must be greater than 0.
element (via put) T Yes Element to add to the queue; must not be null. The calling thread blocks if the queue is full.
threadIndex (via put) int Yes Identifier of the producing thread, used to index into the per-thread condition variable array for blocking and wake-up.

Outputs

Name Type Description
poll return T or null Returns and removes the first element, or null if the queue is empty. Resets availability future when queue becomes empty.
take return T Blocks until an element is available, then returns and removes it. Intended primarily for testing.
peek return T or null Returns the first element without removing it, or null if the queue is empty.
getAvailabilityFuture return CompletableFuture<Void> A future that completes when the queue becomes non-empty or notifyAvailable() is called. Returns the AVAILABLE constant when data is already present.
put return boolean Returns true if the element was successfully enqueued, false if the thread was woken up via wakeUpPuttingThread before the element could be added.

Usage Examples

Basic Usage

// Creating and using FutureCompletingBlockingQueue for producer-consumer coordination
FutureCompletingBlockingQueue<RecordsWithSplitIds<String>> queue =
        new FutureCompletingBlockingQueue<>(10);

// Producer thread (fetcher) - blocks if queue is full
int fetcherId = 0;
RecordsWithSplitIds<String> batch = fetchRecordsFromExternalSystem();
boolean accepted = queue.put(fetcherId, batch);
if (!accepted) {
    // Thread was woken up for shutdown, element not added
    return;
}

// Consumer thread (main reader) - non-blocking poll with future-based notification
RecordsWithSplitIds<String> records = queue.poll();
if (records == null) {
    // Queue is empty, wait for availability asynchronously
    CompletableFuture<Void> available = queue.getAvailabilityFuture();
    // Register handler or return NOTHING_AVAILABLE to the mailbox
}

// Graceful shutdown - wake up a blocked producer without interruption
queue.wakeUpPuttingThread(fetcherId);

// Explicit availability notification (e.g., when no-more-splits signal arrives)
queue.notifyAvailable();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment