Heuristic:Apache Flink Async Sink Timeout And Backpressure Defaults
| Knowledge Sources | |
|---|---|
| Domains | Optimization, Configuration |
| Last Updated | 2026-02-09 13:00 GMT |
Overview
Default configuration values for the async sink framework: 10-minute request timeout with graceful failure handling, and element queue capacity of 2 for aggressive backpressure in source readers.
Description
The Apache Flink async sink framework (`AsyncSinkWriter`) and source reader framework (`SourceReaderBase`) use carefully chosen default values that represent tribal knowledge about production streaming workloads. Two key defaults stand out:
Request Timeout (10 minutes): The `DEFAULT_REQUEST_TIMEOUT_MS` is set to 10 minutes (600,000 ms). This is paired with `DEFAULT_FAIL_ON_TIMEOUT = false`, meaning that by default, timed-out requests are not treated as fatal failures. This generous timeout accommodates transient issues in external services (e.g., temporary DynamoDB throttling, S3 slow responses) without causing pipeline failures.
Source Element Queue Capacity (2): The `ELEMENT_QUEUE_CAPACITY` defaults to 2 elements. This extremely small queue creates aggressive backpressure between the split fetcher threads and the main source reader thread, preventing memory bloat from unbounded buffering.
Source Reader Close Timeout (30 seconds): The `SOURCE_READER_CLOSE_TIMEOUT` defaults to 30,000 ms, providing a generous but bounded grace period for clean shutdown.
Usage
These defaults apply to all implementations built on the async sink base and source reader base classes. Tune these values when:
- External service has known latency characteristics (reduce timeout for fast services, increase for slow ones)
- OOM errors occur in source readers (reduce queue capacity, though 2 is already minimal)
- Source reader shutdown hangs (reduce close timeout)
The Insight (Rule of Thumb)
- Action: Start with the defaults. Only override when you have measured evidence of suboptimal behavior.
- Values:
- Request timeout: 10 minutes (generous, prevents false failures)
- Fail on timeout: false (graceful handling)
- Element queue capacity: 2 (aggressive backpressure)
- Source reader close timeout: 30 seconds
- Trade-off: Conservative defaults prioritize reliability over latency. The 10-minute timeout means slow requests are tolerated but pipeline latency may spike. The queue capacity of 2 minimizes memory usage but may cause fetcher thread stalls.
Reasoning
Why 10 minutes? In production, external services (AWS, GCP, etc.) can experience transient throttling or slowdowns lasting several minutes. A shorter timeout would cause request retries or failures during these episodes, potentially leading to data loss or pipeline restarts. The 10-minute default is long enough to ride out most transient issues while still detecting genuinely stuck requests.
Why not fail on timeout? When a request times out, it does not necessarily mean the data was lost. The external service may have processed it successfully but was slow to respond. Failing on timeout would trigger checkpoint failures and pipeline restarts, potentially causing duplicate processing. Graceful handling allows the pipeline to continue.
Why queue capacity of 2? The element queue sits between the split fetcher thread (I/O) and the source reader thread (processing). A large queue would allow the fetcher to read ahead aggressively, consuming memory for buffered records. A capacity of 2 keeps memory usage predictable: at most 2 batches of records are in memory between the fetcher and reader. This is especially important for sources reading large records (e.g., Parquet row groups).
Code Evidence
Async sink timeout defaults from `AsyncSinkWriterConfiguration.java:34-35`:
public static final long DEFAULT_REQUEST_TIMEOUT_MS = Duration.ofMinutes(10).toMillis();
public static final boolean DEFAULT_FAIL_ON_TIMEOUT = false;
Source reader queue and timeout from `SourceReaderOptions.java:30-40`:
public static final ConfigOption<Long> SOURCE_READER_CLOSE_TIMEOUT =
ConfigOptions.key("source.reader.close.timeout")
.longType()
.defaultValue(30000L)
.withDescription("The timeout when closing the source reader");
public static final ConfigOption<Integer> ELEMENT_QUEUE_CAPACITY =
ConfigOptions.key("source.reader.element.queue.capacity")
.intType()
.defaultValue(2)
.withDescription("The capacity of the element queue in the source reader.");
Bucket check interval from `FileSink.java:297`:
protected static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;