Implementation:Apache Flink AsyncSinkConnectorOptions
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Table_API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A configuration options class defining the optional Table API/SQL properties for async sink connectors.
Description
AsyncSinkConnectorOptions is a class in the flink-connector-base module that defines the standard ConfigOption constants for configuring AsyncSinkBase behavior through Flink's Table API/SQL. Each option corresponds to a tuning parameter of the async sink framework and is declared as a public static final ConfigOption. All options have no default values, meaning they are entirely optional and will fall back to the AsyncSinkBase defaults if not specified.
The five configuration options defined are: MAX_BATCH_SIZE (sink.batch.max-size), MAX_IN_FLIGHT_REQUESTS (sink.requests.max-inflight), MAX_BUFFERED_REQUESTS (sink.requests.max-buffered), FLUSH_BUFFER_SIZE (sink.flush-buffer.size), and FLUSH_BUFFER_TIMEOUT (sink.flush-buffer.timeout). These options are registered as optional table options by AsyncDynamicTableSinkFactory.
Usage
Connector developers reference these options when building Table API/SQL connectors backed by AsyncSinkBase. The options are automatically included in the optional options set by AsyncDynamicTableSinkFactory. End users specify these options in SQL DDL statements or Table API configurations to tune async sink performance.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/AsyncSinkConnectorOptions.java
- Lines: 1-67
Signature
@PublicEvolving
public class AsyncSinkConnectorOptions
Import
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| (none) | This class has no inputs; it only declares static configuration option constants. |
Outputs
| Name | Type | Description |
|---|---|---|
| MAX_BATCH_SIZE | ConfigOption<Integer> | sink.batch.max-size - Maximum number of elements that may be passed in a batch to be written downstream. No default. |
| MAX_IN_FLIGHT_REQUESTS | ConfigOption<Integer> | sink.requests.max-inflight - Request threshold for uncompleted requests before blocking new write requests. No default. |
| MAX_BUFFERED_REQUESTS | ConfigOption<Integer> | sink.requests.max-buffered - Maximum number of buffered records before applying backpressure. No default. |
| FLUSH_BUFFER_SIZE | ConfigOption<Long> | sink.flush-buffer.size - Threshold value in bytes for writer buffer flushing. No default. |
| FLUSH_BUFFER_TIMEOUT | ConfigOption<Long> | sink.flush-buffer.timeout - Threshold time in milliseconds for an element to be in a buffer before being flushed. No default. |
Configuration Reference
| SQL Key | Java Constant | Type | Default | Description |
|---|---|---|---|---|
| sink.batch.max-size | MAX_BATCH_SIZE | Integer | (none) | Maximum number of elements per batch write. |
| sink.requests.max-inflight | MAX_IN_FLIGHT_REQUESTS | Integer | (none) | Maximum uncompleted write requests before blocking. |
| sink.requests.max-buffered | MAX_BUFFERED_REQUESTS | Integer | (none) | Maximum buffered records before applying backpressure. |
| sink.flush-buffer.size | FLUSH_BUFFER_SIZE | Long | (none) | Buffer size threshold in bytes for flushing. |
| sink.flush-buffer.timeout | FLUSH_BUFFER_TIMEOUT | Long | (none) | Maximum time in milliseconds before buffer is flushed. |
Usage Examples
// Using async sink options in a SQL DDL statement
// CREATE TABLE my_sink (
// id BIGINT,
// name STRING
// ) WITH (
// 'connector' = 'my-async-sink',
// 'format' = 'json',
// 'sink.batch.max-size' = '500',
// 'sink.requests.max-inflight' = '10',
// 'sink.requests.max-buffered' = '1000',
// 'sink.flush-buffer.size' = '5242880',
// 'sink.flush-buffer.timeout' = '5000'
// );
// Referencing options programmatically in a factory
ReadableConfig tableOptions = helper.getOptions();
Optional<Integer> maxBatchSize = tableOptions.getOptional(AsyncSinkConnectorOptions.MAX_BATCH_SIZE);
Optional<Long> flushBufferSize = tableOptions.getOptional(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE);
Optional<Long> flushTimeout = tableOptions.getOptional(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT);
Related Pages
- Principle:Apache_Flink_Async_Sink_Framework
- Apache_Flink_AsyncDynamicTableSinkFactory - Factory that registers these options as optional table options
- Apache_Flink_AsyncDynamicTableSink - Table sink configured by these options
- Apache_Flink_AsyncDynamicTableSinkBuilder - Builder that accepts these option values