Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Workflow:Apache Flink Async Sink Lifecycle

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Stream_Processing, Connector_Development
Last Updated 2026-02-09 13:00 GMT

Overview

End-to-end process for building and operating an asynchronous sink connector using Flink's AsyncSinkBase framework with configurable batching, rate limiting, and retry semantics.

Description

This workflow describes the lifecycle of the Flink async sink framework, designed for writing data to external async-capable destinations (e.g., AWS Kinesis, Amazon Firehose, HTTP endpoints). The framework handles element conversion, request buffering, batch formation, asynchronous submission, rate limiting via congestion control strategies, and automatic retry of failed requests. Developers extend the abstract AsyncSinkWriter to implement destination-specific submission logic while inheriting all batching, backpressure, and fault-tolerance behavior.

Key capabilities:

  • Configurable batch size, byte thresholds, and time-based flushing
  • Congestion control rate limiting with AIMD (Additive Increase, Multiplicative Decrease) scaling
  • Automatic retry of failed request entries with re-queuing to the buffer
  • Request timeout handling with configurable fail-on-timeout behavior
  • State snapshotting for exactly-once semantics
  • Fatal exception classification for non-retryable errors

Usage

Execute this workflow when you need to build a new Flink sink connector for an external system that supports asynchronous write operations, or when you need to understand the internal lifecycle of an existing async sink (Kinesis, Firehose, etc.). The framework is appropriate for destinations where writes are I/O-bound and benefit from batching and concurrent request submission.

Execution Steps

Step 1: Define Element Conversion

Implement the ElementConverter interface to transform incoming stream elements into the request entry format expected by the destination. This conversion maps Flink's internal data types to the destination-specific wire format. The converter receives both the element and a SinkWriter.Context with metadata like timestamp and watermark.

Key considerations:

  • The ElementConverter must be serializable for Flink's distributed execution
  • Conversion should be lightweight as it runs on the main processing thread
  • The output type becomes the unit of batching and submission
  • Size estimation via getSizeInBytes() must be implemented for byte-based batching

Step 2: Configure Sink Parameters

Set the async sink's operational parameters through the AsyncSinkBaseBuilder. These parameters control batching behavior (max batch size, max batch bytes), concurrency (max in-flight requests), buffering (max buffered requests), timing (max time in buffer, request timeout), and failure handling (fail on timeout, fatal exception classifier).

Key considerations:

  • maxBatchSize limits the number of records per submission batch
  • maxBatchSizeInBytes limits the total bytes per batch
  • maxInFlightRequests controls concurrent async submissions
  • maxBufferedRequests sets the backpressure threshold
  • maxTimeInBufferMS triggers time-based flushing
  • requestTimeoutMS defines the deadline for async responses

Step 3: Buffer Incoming Elements

As elements arrive via the write() method, the AsyncSinkWriter converts them using the ElementConverter and adds them to the RequestBuffer (default: DequeRequestBuffer). Each entry is wrapped in a RequestEntryWrapper that tracks its size in bytes. The buffer accumulates entries until a flush condition is triggered.

What happens:

  • Each incoming element passes through the ElementConverter
  • The converted request entry is wrapped with its byte size
  • The entry is added to the RequestBuffer
  • If the buffer exceeds maxBufferedRequests, backpressure is applied

Step 4: Form and Submit Batches

When flush conditions are met (batch size reached, byte threshold exceeded, timer fired, or checkpoint triggered), the AsyncSinkWriter extracts entries from the buffer and forms a batch via the BatchCreator. The batch is then submitted asynchronously through the abstract submitRequestEntries() method, which subclasses implement for their specific destination.

What happens:

  • The RateLimitingStrategy is consulted to determine the allowed batch size
  • The BatchCreator creates a Batch object from buffered entries
  • The batch is passed to submitRequestEntries() for async submission
  • An in-flight request counter is incremented
  • On completion, the ResultHandler callback processes the response

Step 5: Handle Rate Limiting and Backpressure

The CongestionControlRateLimitingStrategy dynamically adjusts the allowed batch size based on submission outcomes. Successful submissions trigger additive increases to the rate limit. Failed submissions (exceptions from the destination) trigger multiplicative decreases via the AIMDScalingStrategy. This provides TCP-like congestion control for adaptive throughput.

Key considerations:

  • AIMD scaling: rate increases linearly on success, decreases by a factor on failure
  • The scaling strategy has configurable increase rate, decrease factor, and rate bounds
  • NoOpScalingStrategy disables dynamic rate adjustment
  • Backpressure propagates upstream when the buffer is full and requests are in-flight

Step 6: Handle Failures and Checkpoints

Failed request entries are re-queued to the front of the buffer for retry. Fatal exceptions (identified by the FatalExceptionClassifier) cause immediate job failure. On checkpoint, all buffered entries are drained by flushing, ensuring no data loss. The BufferedRequestState captures any remaining entries for state persistence.

Key considerations:

  • Non-fatal failures result in re-queuing with retry
  • Fatal exceptions (e.g., invalid credentials) bypass retry and fail the job
  • Checkpoint preparation blocks until all in-flight requests complete
  • State snapshot captures remaining buffer contents for recovery
  • Request timeouts can either fail the job or trigger retry based on configuration

Execution Diagram

GitHub URL

Workflow Repository