Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink AsyncSinkConnectorOptions

From Leeroopedia


Knowledge Sources
Domains Connectors, Table_API
Last Updated 2026-02-09 00:00 GMT

Overview

A configuration options class defining the optional Table API/SQL properties for async sink connectors.

Description

AsyncSinkConnectorOptions is a class in the flink-connector-base module that defines the standard ConfigOption constants for configuring AsyncSinkBase behavior through Flink's Table API/SQL. Each option corresponds to a tuning parameter of the async sink framework and is declared as a public static final ConfigOption. All options have no default values, meaning they are entirely optional and will fall back to the AsyncSinkBase defaults if not specified.

The five configuration options defined are: MAX_BATCH_SIZE (sink.batch.max-size), MAX_IN_FLIGHT_REQUESTS (sink.requests.max-inflight), MAX_BUFFERED_REQUESTS (sink.requests.max-buffered), FLUSH_BUFFER_SIZE (sink.flush-buffer.size), and FLUSH_BUFFER_TIMEOUT (sink.flush-buffer.timeout). These options are registered as optional table options by AsyncDynamicTableSinkFactory.

Usage

Connector developers reference these options when building Table API/SQL connectors backed by AsyncSinkBase. The options are automatically included in the optional options set by AsyncDynamicTableSinkFactory. End users specify these options in SQL DDL statements or Table API configurations to tune async sink performance.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/AsyncSinkConnectorOptions.java
  • Lines: 1-67

Signature

@PublicEvolving
public class AsyncSinkConnectorOptions

Import

import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;

I/O Contract

Inputs

Name Type Required Description
(none) This class has no inputs; it only declares static configuration option constants.

Outputs

Name Type Description
MAX_BATCH_SIZE ConfigOption<Integer> sink.batch.max-size - Maximum number of elements that may be passed in a batch to be written downstream. No default.
MAX_IN_FLIGHT_REQUESTS ConfigOption<Integer> sink.requests.max-inflight - Request threshold for uncompleted requests before blocking new write requests. No default.
MAX_BUFFERED_REQUESTS ConfigOption<Integer> sink.requests.max-buffered - Maximum number of buffered records before applying backpressure. No default.
FLUSH_BUFFER_SIZE ConfigOption<Long> sink.flush-buffer.size - Threshold value in bytes for writer buffer flushing. No default.
FLUSH_BUFFER_TIMEOUT ConfigOption<Long> sink.flush-buffer.timeout - Threshold time in milliseconds for an element to be in a buffer before being flushed. No default.

Configuration Reference

SQL Key Java Constant Type Default Description
sink.batch.max-size MAX_BATCH_SIZE Integer (none) Maximum number of elements per batch write.
sink.requests.max-inflight MAX_IN_FLIGHT_REQUESTS Integer (none) Maximum uncompleted write requests before blocking.
sink.requests.max-buffered MAX_BUFFERED_REQUESTS Integer (none) Maximum buffered records before applying backpressure.
sink.flush-buffer.size FLUSH_BUFFER_SIZE Long (none) Buffer size threshold in bytes for flushing.
sink.flush-buffer.timeout FLUSH_BUFFER_TIMEOUT Long (none) Maximum time in milliseconds before buffer is flushed.

Usage Examples

// Using async sink options in a SQL DDL statement
// CREATE TABLE my_sink (
//   id BIGINT,
//   name STRING
// ) WITH (
//   'connector' = 'my-async-sink',
//   'format' = 'json',
//   'sink.batch.max-size' = '500',
//   'sink.requests.max-inflight' = '10',
//   'sink.requests.max-buffered' = '1000',
//   'sink.flush-buffer.size' = '5242880',
//   'sink.flush-buffer.timeout' = '5000'
// );

// Referencing options programmatically in a factory
ReadableConfig tableOptions = helper.getOptions();
Optional<Integer> maxBatchSize = tableOptions.getOptional(AsyncSinkConnectorOptions.MAX_BATCH_SIZE);
Optional<Long> flushBufferSize = tableOptions.getOptional(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE);
Optional<Long> flushTimeout = tableOptions.getOptional(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment