Heuristic:Apache Flink False Positive Availability Optimization
| Knowledge Sources | |
|---|---|
| Domains | Optimization, Streaming |
| Last Updated | 2026-02-09 13:00 GMT |
Overview
Source reader optimization that intentionally returns false positives for data availability, trading occasional spurious wakeups for reduced per-record checking overhead.
Description
The `SourceReaderBase` and `FutureCompletingBlockingQueue` classes implement a deliberate design pattern where the availability signal (a `CompletableFuture`) can be completed even when no data is actually available. This is analogous to Java's monitor spurious wakeup pattern, where a thread waiting on a condition variable may be woken up without the condition being true.
In the Flink source reader hot path, after emitting a record, the reader always returns `InputStatus.MORE_AVAILABLE` rather than checking whether more records are actually available. The consumer loop handles this by simply polling again and discovering the actual state on the next call. Additionally, a pre-completed static `CompletableFuture` (`AVAILABLE`) is cached to avoid volatile memory operations in hot paths where availability is guaranteed.
Usage
This heuristic is relevant when implementing or tuning custom source readers that extend `SourceReaderBase`. Understanding this pattern is essential for:
- Correctly implementing the `pollNext()` contract (consumers must handle false positives)
- Avoiding unnecessary synchronization in record-emitting hot paths
- Understanding why the element queue uses a capacity of only 2 (aggressive backpressure by design)
The Insight (Rule of Thumb)
- Action: Always return `MORE_AVAILABLE` after emitting a record instead of checking if more records exist.
- Value: Saves one availability check per record emitted, which accumulates significantly at high throughput.
- Trade-off: Occasional false positive wakeups (consumer polls and finds nothing) vs. per-record availability check overhead.
Reasoning
In a high-throughput streaming pipeline, the `pollNext()` method is called millions of times per second. Checking availability after every record emission requires either a volatile read or a synchronized block, both of which are expensive at this frequency. By always returning `MORE_AVAILABLE`, the cost of the occasional false positive (an extra method call that returns `NOTHING_AVAILABLE`) is amortized across millions of successful record emissions.
The pattern follows Java's established spurious wakeup convention: consumers of blocking queues and condition variables must always re-check the condition in a loop. The `FutureCompletingBlockingQueue` documentation explicitly states this contract, noting that false positives are acceptable and expected.
Additionally, a pre-completed `CompletableFuture` constant is cached as `AVAILABLE` to eliminate volatile memory operations when the queue is known to have data, further reducing overhead in the common case.
Code Evidence
False positive pattern from `SourceReaderBase.java:204-209`:
// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
Spurious wakeup contract from `FutureCompletingBlockingQueue.java:60-64`:
// Note that this model generally assumes that false positives are okay, meaning that the
// availability future completes despite there being no data available in the queue. The consumer
// is responsible for polling data and obtaining another future to wait on. This is similar to the
// way that Java's Monitors and Conditions can have the spurious wakeup of the waiting
// threads and commonly need to be used in loop with the waiting condition.
Cached pre-completed future from `FutureCompletingBlockingQueue.java:77-82`:
/**
* A constant future that is complete, indicating availability. Using this constant in cases
* that are guaranteed available helps short-circuiting some checks and avoiding volatile memory
* operations.
*/
public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture();
Element queue capacity from `SourceReaderOptions.java:36-40`:
public static final ConfigOption<Integer> ELEMENT_QUEUE_CAPACITY =
ConfigOptions.key("source.reader.element.queue.capacity")
.intType()
.defaultValue(2)
.withDescription("The capacity of the element queue in the source reader.");