Workflow:Apache Flink Async Sink Lifecycle
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Stream_Processing, Connector_Development |
| Last Updated | 2026-02-09 13:00 GMT |
Overview
End-to-end process for building and operating an asynchronous sink connector using Flink's AsyncSinkBase framework with configurable batching, rate limiting, and retry semantics.
Description
This workflow describes the lifecycle of the Flink async sink framework, designed for writing data to external async-capable destinations (e.g., AWS Kinesis, Amazon Firehose, HTTP endpoints). The framework handles element conversion, request buffering, batch formation, asynchronous submission, rate limiting via congestion control strategies, and automatic retry of failed requests. Developers extend the abstract AsyncSinkWriter to implement destination-specific submission logic while inheriting all batching, backpressure, and fault-tolerance behavior.
Key capabilities:
- Configurable batch size, byte thresholds, and time-based flushing
- Congestion control rate limiting with AIMD (Additive Increase, Multiplicative Decrease) scaling
- Automatic retry of failed request entries with re-queuing to the buffer
- Request timeout handling with configurable fail-on-timeout behavior
- State snapshotting for exactly-once semantics
- Fatal exception classification for non-retryable errors
Usage
Execute this workflow when you need to build a new Flink sink connector for an external system that supports asynchronous write operations, or when you need to understand the internal lifecycle of an existing async sink (Kinesis, Firehose, etc.). The framework is appropriate for destinations where writes are I/O-bound and benefit from batching and concurrent request submission.
Execution Steps
Step 1: Define Element Conversion
Implement the ElementConverter interface to transform incoming stream elements into the request entry format expected by the destination. This conversion maps Flink's internal data types to the destination-specific wire format. The converter receives both the element and a SinkWriter.Context with metadata like timestamp and watermark.
Key considerations:
- The ElementConverter must be serializable for Flink's distributed execution
- Conversion should be lightweight as it runs on the main processing thread
- The output type becomes the unit of batching and submission
- Size estimation via getSizeInBytes() must be implemented for byte-based batching
Step 2: Configure Sink Parameters
Set the async sink's operational parameters through the AsyncSinkBaseBuilder. These parameters control batching behavior (max batch size, max batch bytes), concurrency (max in-flight requests), buffering (max buffered requests), timing (max time in buffer, request timeout), and failure handling (fail on timeout, fatal exception classifier).
Key considerations:
- maxBatchSize limits the number of records per submission batch
- maxBatchSizeInBytes limits the total bytes per batch
- maxInFlightRequests controls concurrent async submissions
- maxBufferedRequests sets the backpressure threshold
- maxTimeInBufferMS triggers time-based flushing
- requestTimeoutMS defines the deadline for async responses
Step 3: Buffer Incoming Elements
As elements arrive via the write() method, the AsyncSinkWriter converts them using the ElementConverter and adds them to the RequestBuffer (default: DequeRequestBuffer). Each entry is wrapped in a RequestEntryWrapper that tracks its size in bytes. The buffer accumulates entries until a flush condition is triggered.
What happens:
- Each incoming element passes through the ElementConverter
- The converted request entry is wrapped with its byte size
- The entry is added to the RequestBuffer
- If the buffer exceeds maxBufferedRequests, backpressure is applied
Step 4: Form and Submit Batches
When flush conditions are met (batch size reached, byte threshold exceeded, timer fired, or checkpoint triggered), the AsyncSinkWriter extracts entries from the buffer and forms a batch via the BatchCreator. The batch is then submitted asynchronously through the abstract submitRequestEntries() method, which subclasses implement for their specific destination.
What happens:
- The RateLimitingStrategy is consulted to determine the allowed batch size
- The BatchCreator creates a Batch object from buffered entries
- The batch is passed to submitRequestEntries() for async submission
- An in-flight request counter is incremented
- On completion, the ResultHandler callback processes the response
Step 5: Handle Rate Limiting and Backpressure
The CongestionControlRateLimitingStrategy dynamically adjusts the allowed batch size based on submission outcomes. Successful submissions trigger additive increases to the rate limit. Failed submissions (exceptions from the destination) trigger multiplicative decreases via the AIMDScalingStrategy. This provides TCP-like congestion control for adaptive throughput.
Key considerations:
- AIMD scaling: rate increases linearly on success, decreases by a factor on failure
- The scaling strategy has configurable increase rate, decrease factor, and rate bounds
- NoOpScalingStrategy disables dynamic rate adjustment
- Backpressure propagates upstream when the buffer is full and requests are in-flight
Step 6: Handle Failures and Checkpoints
Failed request entries are re-queued to the front of the buffer for retry. Fatal exceptions (identified by the FatalExceptionClassifier) cause immediate job failure. On checkpoint, all buffered entries are drained by flushing, ensuring no data loss. The BufferedRequestState captures any remaining entries for state persistence.
Key considerations:
- Non-fatal failures result in re-queuing with retry
- Fatal exceptions (e.g., invalid credentials) bypass retry and fail the job
- Checkpoint preparation blocks until all in-flight requests complete
- State snapshot captures remaining buffer contents for recovery
- Request timeouts can either fail the job or trigger retry based on configuration