Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink AsyncDynamicTableSink

From Leeroopedia


Knowledge Sources
Domains Connectors, Table_API
Last Updated 2026-02-09 00:00 GMT

Overview

An abstract Table API dynamic table sink wrapper that integrates with Flink's async sink framework.

Description

AsyncDynamicTableSink is an abstract generic class in the flink-connector-base module that implements DynamicTableSink and wraps the configuration attributes of AsyncSinkBase. It is parameterized by RequestEntryT (which must extend Serializable), representing the type of request entries written to the downstream system. The class stores five nullable async sink configuration fields: maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBufferSizeInBytes, and maxTimeInBufferMS.

The class provides a protected method addAsyncOptionsToSinkBuilder() that transfers its stored configuration values to an AsyncSinkBaseBuilder instance, using Optional.ofNullable() to apply only non-null values. It also implements equals() and hashCode() based on the five async configuration fields. Concrete implementations must provide the getSinkRuntimeProvider() and getChangelogMode() methods from the DynamicTableSink interface.

Usage

Connector developers extend this class when building a Table API/SQL sink that is backed by Flink's async sink infrastructure. The developer adds connector-specific fields and implements the DynamicTableSink methods, using addAsyncOptionsToSinkBuilder() to propagate the async configuration to the underlying AsyncSinkBase builder during sink creation.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/sink/AsyncDynamicTableSink.java
  • Lines: 1-100

Signature

@PublicEvolving
public abstract class AsyncDynamicTableSink<RequestEntryT extends Serializable>
        implements DynamicTableSink

Import

import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;

I/O Contract

Inputs

Name Type Required Description
maxBatchSize Integer No Maximum number of elements per batch write. Nullable.
maxInFlightRequests Integer No Maximum number of uncompleted in-flight write requests. Nullable.
maxBufferedRequests Integer No Maximum number of buffered records before backpressure. Nullable.
maxBufferSizeInBytes Long No Threshold in bytes for buffer flushing. Nullable.
maxTimeInBufferMS Long No Maximum time in milliseconds an element may remain in the buffer. Nullable.

Outputs

Name Type Description
addAsyncOptionsToSinkBuilder() AsyncSinkBaseBuilder<?, RequestEntryT, ?> Returns the provided builder with all non-null async options applied.

Protected Fields

Field Type Description
maxBatchSize Integer Maximum batch size for downstream writes. May be null.
maxInFlightRequests Integer Maximum in-flight request count. May be null.
maxBufferedRequests Integer Maximum buffered request count. May be null.
maxBufferSizeInBytes Long Buffer size threshold in bytes. May be null.
maxTimeInBufferMS Long Maximum time in buffer in milliseconds. May be null.

Usage Examples

// Extending AsyncDynamicTableSink for a custom connector
public class MyAsyncTableSink extends AsyncDynamicTableSink<MyRequestEntry> {

    private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
    private final DataType physicalDataType;
    private final String endpoint;

    public MyAsyncTableSink(
            String endpoint,
            EncodingFormat<SerializationSchema<RowData>> encodingFormat,
            DataType physicalDataType,
            Integer maxBatchSize,
            Integer maxInFlightRequests,
            Integer maxBufferedRequests,
            Long maxBufferSizeInBytes,
            Long maxTimeInBufferMS) {
        super(maxBatchSize, maxInFlightRequests, maxBufferedRequests,
              maxBufferSizeInBytes, maxTimeInBufferMS);
        this.endpoint = endpoint;
        this.encodingFormat = encodingFormat;
        this.physicalDataType = physicalDataType;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.insertOnly();
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        SerializationSchema<RowData> serializationSchema =
            encodingFormat.createRuntimeEncoder(context, physicalDataType);

        MyAsyncSinkBuilder builder = new MyAsyncSinkBuilder()
            .setEndpoint(endpoint)
            .setSerializationSchema(serializationSchema);

        // Apply the async sink options from this table sink to the builder
        addAsyncOptionsToSinkBuilder(builder);

        return SinkV2Provider.of(builder.build());
    }

    @Override
    public DynamicTableSink copy() {
        return new MyAsyncTableSink(endpoint, encodingFormat, physicalDataType,
            maxBatchSize, maxInFlightRequests, maxBufferedRequests,
            maxBufferSizeInBytes, maxTimeInBufferMS);
    }

    @Override
    public String asSummaryString() {
        return "My Async Table Sink";
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment