Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink AsyncDynamicTableSinkFactory

From Leeroopedia


Knowledge Sources
Domains Connectors, Table_API
Last Updated 2026-02-09 00:00 GMT

Overview

An abstract factory class for creating async dynamic table sinks with configurable async sink options.

Description

AsyncDynamicTableSinkFactory is an abstract class in the flink-connector-base module that implements DynamicTableSinkFactory. It provides the base infrastructure for building Table API/SQL sink factories backed by Flink's AsyncSinkBase. The class automatically registers the common async sink configuration options (batch size, buffer size, buffer timeout, in-flight requests, buffered requests) as optional table options through the optionalOptions() method.

The class provides a helper method addAsyncOptionsToBuilder() that reads async sink configuration from a Properties object and applies them to an AsyncDynamicTableSinkBuilder. It also contains an inner class AsyncDynamicSinkContext (annotated @Internal) which encapsulates common table metadata needed during sink creation, including table options, physical data type, encoding format, partition keys, and the factory helper.

Usage

Connector developers extend this class when building a Table API/SQL sink factory for a connector that uses AsyncSinkBase. The developer implements the createDynamicTableSink() method and requiredOptions() method, while inheriting the standard async sink optional options. The AsyncDynamicSinkContext inner class can be used to extract table metadata from the DynamicTableFactory.Context.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/AsyncDynamicTableSinkFactory.java
  • Lines: 1-141

Signature

@PublicEvolving
public abstract class AsyncDynamicTableSinkFactory implements DynamicTableSinkFactory

Import

import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;

I/O Contract

Inputs

Name Type Required Description
configuration Properties Yes A properties object containing async sink configuration values, passed to addAsyncOptionsToBuilder().
builder AsyncDynamicTableSinkBuilder<?, ?> Yes The builder instance to apply async options to, passed to addAsyncOptionsToBuilder().
factory AsyncDynamicTableSinkFactory Yes The factory instance, passed to AsyncDynamicSinkContext constructor.
context DynamicTableFactory.Context Yes The table factory context, passed to AsyncDynamicSinkContext constructor.

Outputs

Name Type Description
optionalOptions() Set<ConfigOption<?>> Returns the set of optional async sink config options (MAX_BATCH_SIZE, FLUSH_BUFFER_SIZE, MAX_BUFFERED_REQUESTS, FLUSH_BUFFER_TIMEOUT, MAX_IN_FLIGHT_REQUESTS).
addAsyncOptionsToBuilder() AsyncDynamicTableSinkBuilder<?, ?> Returns the builder with async sink options applied from the properties.

Inner Classes

AsyncDynamicSinkContext

An @Internal inner class providing common table data required to create an AsyncDynamicTableSink.

Method Return Type Description
getTableOptions() ReadableConfig Returns the resolved table options.
getPhysicalDataType() DataType Returns the physical row data type from the catalog table schema.
getFactoryHelper() FactoryUtil.TableFactoryHelper Returns the table factory helper for option validation and format discovery.
getResolvedOptions() Map<String, String> Returns the raw resolved options map from the catalog table.
getEncodingFormat() EncodingFormat<SerializationSchema<RowData>> Returns the encoding format discovered via the FORMAT option.
getPartitionKeys() List<String> Returns the partition keys of the catalog table.
isPartitioned() boolean Returns whether the catalog table is partitioned.

Usage Examples

// Extending AsyncDynamicTableSinkFactory for a custom connector
public class MyAsyncSinkFactory extends AsyncDynamicTableSinkFactory {

    @Override
    public String factoryIdentifier() {
        return "my-async-sink";
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(MY_ENDPOINT_OPTION);
        options.add(FORMAT);
        return options;
    }

    @Override
    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        AsyncDynamicSinkContext sinkContext = new AsyncDynamicSinkContext(this, context);

        // Build the table sink using the context
        Properties asyncProps = new Properties();
        ReadableConfig tableOptions = sinkContext.getTableOptions();
        tableOptions.getOptional(MAX_BATCH_SIZE)
            .ifPresent(v -> asyncProps.put(MAX_BATCH_SIZE.key(), v));

        MyAsyncTableSinkBuilder builder = new MyAsyncTableSinkBuilder()
            .setEndpoint(tableOptions.get(MY_ENDPOINT_OPTION))
            .setEncodingFormat(sinkContext.getEncodingFormat())
            .setPhysicalDataType(sinkContext.getPhysicalDataType());

        addAsyncOptionsToBuilder(asyncProps, builder);
        return builder.build();
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment