Implementation:Apache Flink AsyncDynamicTableSink
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Table_API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
An abstract Table API dynamic table sink wrapper that integrates with Flink's async sink framework.
Description
AsyncDynamicTableSink is an abstract generic class in the flink-connector-base module that implements DynamicTableSink and wraps the configuration attributes of AsyncSinkBase. It is parameterized by RequestEntryT (which must extend Serializable), representing the type of request entries written to the downstream system. The class stores five nullable async sink configuration fields: maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBufferSizeInBytes, and maxTimeInBufferMS.
The class provides a protected method addAsyncOptionsToSinkBuilder() that transfers its stored configuration values to an AsyncSinkBaseBuilder instance, using Optional.ofNullable() to apply only non-null values. It also implements equals() and hashCode() based on the five async configuration fields. Concrete implementations must provide the getSinkRuntimeProvider() and getChangelogMode() methods from the DynamicTableSink interface.
Usage
Connector developers extend this class when building a Table API/SQL sink that is backed by Flink's async sink infrastructure. The developer adds connector-specific fields and implements the DynamicTableSink methods, using addAsyncOptionsToSinkBuilder() to propagate the async configuration to the underlying AsyncSinkBase builder during sink creation.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/sink/AsyncDynamicTableSink.java
- Lines: 1-100
Signature
@PublicEvolving
public abstract class AsyncDynamicTableSink<RequestEntryT extends Serializable>
implements DynamicTableSink
Import
import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| maxBatchSize | Integer | No | Maximum number of elements per batch write. Nullable. |
| maxInFlightRequests | Integer | No | Maximum number of uncompleted in-flight write requests. Nullable. |
| maxBufferedRequests | Integer | No | Maximum number of buffered records before backpressure. Nullable. |
| maxBufferSizeInBytes | Long | No | Threshold in bytes for buffer flushing. Nullable. |
| maxTimeInBufferMS | Long | No | Maximum time in milliseconds an element may remain in the buffer. Nullable. |
Outputs
| Name | Type | Description |
|---|---|---|
| addAsyncOptionsToSinkBuilder() | AsyncSinkBaseBuilder<?, RequestEntryT, ?> | Returns the provided builder with all non-null async options applied. |
Protected Fields
| Field | Type | Description |
|---|---|---|
| maxBatchSize | Integer | Maximum batch size for downstream writes. May be null. |
| maxInFlightRequests | Integer | Maximum in-flight request count. May be null. |
| maxBufferedRequests | Integer | Maximum buffered request count. May be null. |
| maxBufferSizeInBytes | Long | Buffer size threshold in bytes. May be null. |
| maxTimeInBufferMS | Long | Maximum time in buffer in milliseconds. May be null. |
Usage Examples
// Extending AsyncDynamicTableSink for a custom connector
public class MyAsyncTableSink extends AsyncDynamicTableSink<MyRequestEntry> {
private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
private final DataType physicalDataType;
private final String endpoint;
public MyAsyncTableSink(
String endpoint,
EncodingFormat<SerializationSchema<RowData>> encodingFormat,
DataType physicalDataType,
Integer maxBatchSize,
Integer maxInFlightRequests,
Integer maxBufferedRequests,
Long maxBufferSizeInBytes,
Long maxTimeInBufferMS) {
super(maxBatchSize, maxInFlightRequests, maxBufferedRequests,
maxBufferSizeInBytes, maxTimeInBufferMS);
this.endpoint = endpoint;
this.encodingFormat = encodingFormat;
this.physicalDataType = physicalDataType;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return ChangelogMode.insertOnly();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
SerializationSchema<RowData> serializationSchema =
encodingFormat.createRuntimeEncoder(context, physicalDataType);
MyAsyncSinkBuilder builder = new MyAsyncSinkBuilder()
.setEndpoint(endpoint)
.setSerializationSchema(serializationSchema);
// Apply the async sink options from this table sink to the builder
addAsyncOptionsToSinkBuilder(builder);
return SinkV2Provider.of(builder.build());
}
@Override
public DynamicTableSink copy() {
return new MyAsyncTableSink(endpoint, encodingFormat, physicalDataType,
maxBatchSize, maxInFlightRequests, maxBufferedRequests,
maxBufferSizeInBytes, maxTimeInBufferMS);
}
@Override
public String asSummaryString() {
return "My Async Table Sink";
}
}
Related Pages
- Principle:Apache_Flink_Async_Sink_Framework
- Apache_Flink_AsyncDynamicTableSinkBuilder - Builder for creating instances of this class
- Apache_Flink_AsyncDynamicTableSinkFactory - Factory that produces this sink via Table API/SQL
- Apache_Flink_AsyncSinkConnectorOptions - Configuration options stored in this class