Implementation:Apache Flink FutureCompletingBlockingQueue
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Source_Framework |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A custom bounded blocking queue that integrates with CompletableFuture for asynchronous consumer notifications, used as the hand-off mechanism between fetcher threads (producers) and the source reader main thread (consumer).
Description
FutureCompletingBlockingQueue is a critical synchronization primitive in the Flink source reader framework. It bridges the gap between the async mailbox-based main thread model and the blocking I/O fetcher threads, enabling efficient backpressure and availability notification without busy-waiting.
The queue wraps an ArrayDeque with a ReentrantLock for thread safety and maintains a CompletableFuture (currentFuture) that tracks consumer availability notifications. The future completes when the queue transitions from empty to non-empty, or when notifyAvailable() is explicitly called. The future resets to an incomplete state when poll() empties the queue or finds it already empty.
Key features include:
- Asynchronous consumer notifications: Instead of blocking consumers via take(), the queue exposes getAvailabilityFuture() which returns a CompletableFuture that completes when data is available. Consumers can subscribe to completion handlers for non-blocking notification. The model tolerates false positives (spurious completions) similar to Java monitor spurious wakeups.
- Producer backpressure: The put(int, T) method blocks the calling thread when the queue reaches capacity. Each producer thread has its own Condition variable stored in a dynamically-sized ConditionAndFlag array, indexed by thread/fetcher ID. This avoids unnecessary wake-ups across producer threads.
- Graceful producer wake-up: wakeUpPuttingThread(int) sets a per-thread wake-up flag and signals the corresponding condition, causing the blocked put() to return false without adding the element. This enables cooperative shutdown without thread interruption.
- Availability constant: A static AVAILABLE future constant (obtained reflectively from AvailabilityProvider.AVAILABLE in the runtime module) provides efficient short-circuiting when data is known to be available, avoiding volatile memory operations.
- Capacity management: The queue is bounded with a configurable capacity (default from SourceReaderOptions.ELEMENT_QUEUE_CAPACITY). The remainingCapacity() method reports available space.
Usage
FutureCompletingBlockingQueue is typically not instantiated directly by connector developers. It is created internally by SplitFetcherManager and shared between the SplitFetcher threads (which call put()) and the SourceReaderBase main thread (which calls poll() and getAvailabilityFuture()). Understanding this class is important when debugging threading issues, backpressure behavior, or availability notification problems in the source reader framework.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
- Lines: 1-438
Signature
@Internal
public class FutureCompletingBlockingQueue<T> {
// Static constant for availability
public static final CompletableFuture<Void> AVAILABLE;
// Constructors
public FutureCompletingBlockingQueue();
public FutureCompletingBlockingQueue(int capacity);
// Future / Notification logic
public CompletableFuture<Void> getAvailabilityFuture();
public void notifyAvailable();
// Queue operations
public boolean put(int threadIndex, T element) throws InterruptedException;
public T take() throws InterruptedException;
public T poll();
public T peek();
// Capacity and status
public int size();
public boolean isEmpty();
public int remainingCapacity();
// Producer thread management
public void wakeUpPuttingThread(int threadIndex);
}
Import
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| capacity | int | No | Maximum number of elements the queue can hold; defaults to SourceReaderOptions.ELEMENT_QUEUE_CAPACITY default value (2). Must be greater than 0. |
| element (via put) | T | Yes | Element to add to the queue; must not be null. The calling thread blocks if the queue is full. |
| threadIndex (via put) | int | Yes | Identifier of the producing thread, used to index into the per-thread condition variable array for blocking and wake-up. |
Outputs
| Name | Type | Description |
|---|---|---|
| poll return | T or null | Returns and removes the first element, or null if the queue is empty. Resets availability future when queue becomes empty. |
| take return | T | Blocks until an element is available, then returns and removes it. Intended primarily for testing. |
| peek return | T or null | Returns the first element without removing it, or null if the queue is empty. |
| getAvailabilityFuture return | CompletableFuture<Void> | A future that completes when the queue becomes non-empty or notifyAvailable() is called. Returns the AVAILABLE constant when data is already present. |
| put return | boolean | Returns true if the element was successfully enqueued, false if the thread was woken up via wakeUpPuttingThread before the element could be added. |
Usage Examples
Basic Usage
// Creating and using FutureCompletingBlockingQueue for producer-consumer coordination
FutureCompletingBlockingQueue<RecordsWithSplitIds<String>> queue =
new FutureCompletingBlockingQueue<>(10);
// Producer thread (fetcher) - blocks if queue is full
int fetcherId = 0;
RecordsWithSplitIds<String> batch = fetchRecordsFromExternalSystem();
boolean accepted = queue.put(fetcherId, batch);
if (!accepted) {
// Thread was woken up for shutdown, element not added
return;
}
// Consumer thread (main reader) - non-blocking poll with future-based notification
RecordsWithSplitIds<String> records = queue.poll();
if (records == null) {
// Queue is empty, wait for availability asynchronously
CompletableFuture<Void> available = queue.getAvailabilityFuture();
// Register handler or return NOTHING_AVAILABLE to the mailbox
}
// Graceful shutdown - wake up a blocked producer without interruption
queue.wakeUpPuttingThread(fetcherId);
// Explicit availability notification (e.g., when no-more-splits signal arrives)
queue.notifyAvailable();