Implementation:Apache Flink AsyncSinkWriter Write
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Buffering |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for buffering incoming elements with automatic flush scheduling provided by the Apache Flink connector-base module.
Description
The AsyncSinkWriter.write method converts each incoming element via the ElementConverter, wraps it with size metadata as a RequestEntryWrapper, and adds it to the RequestBuffer (default: DequeRequestBuffer backed by ArrayDeque). After adding, it checks flush conditions: if the buffer reaches the batch size or byte threshold, a non-blocking flush is scheduled via the MailboxExecutor. A timer callback ensures entries are flushed within maxTimeInBufferMS.
Usage
This is an internal method called by the Flink runtime for each record flowing through the async sink. Users configure its behavior through the builder parameters.
Code Reference
Source Location
- Repository: Apache Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
- Lines: L312-320
Signature
@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
@Override
public void write(InputT element, Context context)
throws IOException, InterruptedException {
// Converts element, buffers it, and schedules flush if needed
}
protected abstract void submitRequestEntries(
List<RequestEntryT> requestEntries,
ResultHandler<RequestEntryT> resultHandler);
protected abstract long getSizeInBytes(RequestEntryT requestEntry);
}
Import
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| element | InputT | Yes | Pipeline element to buffer |
| context | SinkWriter.Context | Yes | Provides timestamp and watermark |
Outputs
| Name | Type | Description |
|---|---|---|
| side effect | Buffer addition | Element converted and added to RequestBuffer |
| side effect | Flush scheduling | Flush scheduled via MailboxExecutor if thresholds met |
Usage Examples
Write Path Flow
// For each record in the pipeline:
// 1. elementConverter.apply(element, context) -> RequestEntryT
// 2. getSizeInBytes(entry) -> size tracking
// 3. buffer.add(new RequestEntryWrapper(entry, size))
// 4. Check: buffer.size >= maxBatchSize? -> scheduleFlush()
// 5. Check: buffer.totalBytes >= flushThreshold? -> scheduleFlush()
// 6. If no timer active, register timer for maxTimeInBufferMS