Implementation:Apache Flink AsyncDynamicTableSinkFactory
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Table_API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
An abstract factory class for creating async dynamic table sinks with configurable async sink options.
Description
AsyncDynamicTableSinkFactory is an abstract class in the flink-connector-base module that implements DynamicTableSinkFactory. It provides the base infrastructure for building Table API/SQL sink factories backed by Flink's AsyncSinkBase. The class automatically registers the common async sink configuration options (batch size, buffer size, buffer timeout, in-flight requests, buffered requests) as optional table options through the optionalOptions() method.
The class provides a helper method addAsyncOptionsToBuilder() that reads async sink configuration from a Properties object and applies them to an AsyncDynamicTableSinkBuilder. It also contains an inner class AsyncDynamicSinkContext (annotated @Internal) which encapsulates common table metadata needed during sink creation, including table options, physical data type, encoding format, partition keys, and the factory helper.
Usage
Connector developers extend this class when building a Table API/SQL sink factory for a connector that uses AsyncSinkBase. The developer implements the createDynamicTableSink() method and requiredOptions() method, while inheriting the standard async sink optional options. The AsyncDynamicSinkContext inner class can be used to extract table metadata from the DynamicTableFactory.Context.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/AsyncDynamicTableSinkFactory.java
- Lines: 1-141
Signature
@PublicEvolving
public abstract class AsyncDynamicTableSinkFactory implements DynamicTableSinkFactory
Import
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| configuration | Properties | Yes | A properties object containing async sink configuration values, passed to addAsyncOptionsToBuilder(). |
| builder | AsyncDynamicTableSinkBuilder<?, ?> | Yes | The builder instance to apply async options to, passed to addAsyncOptionsToBuilder(). |
| factory | AsyncDynamicTableSinkFactory | Yes | The factory instance, passed to AsyncDynamicSinkContext constructor. |
| context | DynamicTableFactory.Context | Yes | The table factory context, passed to AsyncDynamicSinkContext constructor. |
Outputs
| Name | Type | Description |
|---|---|---|
| optionalOptions() | Set<ConfigOption<?>> | Returns the set of optional async sink config options (MAX_BATCH_SIZE, FLUSH_BUFFER_SIZE, MAX_BUFFERED_REQUESTS, FLUSH_BUFFER_TIMEOUT, MAX_IN_FLIGHT_REQUESTS). |
| addAsyncOptionsToBuilder() | AsyncDynamicTableSinkBuilder<?, ?> | Returns the builder with async sink options applied from the properties. |
Inner Classes
AsyncDynamicSinkContext
An @Internal inner class providing common table data required to create an AsyncDynamicTableSink.
| Method | Return Type | Description |
|---|---|---|
| getTableOptions() | ReadableConfig | Returns the resolved table options. |
| getPhysicalDataType() | DataType | Returns the physical row data type from the catalog table schema. |
| getFactoryHelper() | FactoryUtil.TableFactoryHelper | Returns the table factory helper for option validation and format discovery. |
| getResolvedOptions() | Map<String, String> | Returns the raw resolved options map from the catalog table. |
| getEncodingFormat() | EncodingFormat<SerializationSchema<RowData>> | Returns the encoding format discovered via the FORMAT option. |
| getPartitionKeys() | List<String> | Returns the partition keys of the catalog table. |
| isPartitioned() | boolean | Returns whether the catalog table is partitioned. |
Usage Examples
// Extending AsyncDynamicTableSinkFactory for a custom connector
public class MyAsyncSinkFactory extends AsyncDynamicTableSinkFactory {
@Override
public String factoryIdentifier() {
return "my-async-sink";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(MY_ENDPOINT_OPTION);
options.add(FORMAT);
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
AsyncDynamicSinkContext sinkContext = new AsyncDynamicSinkContext(this, context);
// Build the table sink using the context
Properties asyncProps = new Properties();
ReadableConfig tableOptions = sinkContext.getTableOptions();
tableOptions.getOptional(MAX_BATCH_SIZE)
.ifPresent(v -> asyncProps.put(MAX_BATCH_SIZE.key(), v));
MyAsyncTableSinkBuilder builder = new MyAsyncTableSinkBuilder()
.setEndpoint(tableOptions.get(MY_ENDPOINT_OPTION))
.setEncodingFormat(sinkContext.getEncodingFormat())
.setPhysicalDataType(sinkContext.getPhysicalDataType());
addAsyncOptionsToBuilder(asyncProps, builder);
return builder.build();
}
}
Related Pages
- Principle:Apache_Flink_Async_Sink_Framework
- Apache_Flink_AsyncSinkConnectorOptions - Configuration options used by this factory
- Apache_Flink_AsyncDynamicTableSink - The table sink created by this factory
- Apache_Flink_AsyncDynamicTableSinkBuilder - The builder used to construct the table sink