Workflow:Apache Flink File Sink Pipeline
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Stream_Processing, File_IO |
| Last Updated | 2026-02-09 13:00 GMT |
Overview
End-to-end process for writing streaming or batch data to filesystem files using Flink's unified FileSink connector with bucket-based organization, rolling policies, and exactly-once semantics.
Description
This workflow outlines the standard procedure for persisting data from a Flink pipeline to filesystem files. It leverages the unified FileSink API (FLIP-143) which supports both row-wise and bulk encoding formats. Data is organized into buckets (output directories) determined by a configurable bucket assigner, with file rotation controlled by rolling policies based on size, time, or checkpoint intervals. The sink implements a two-phase commit protocol to guarantee exactly-once delivery semantics, with an optional compaction phase that merges small files into larger ones for downstream efficiency.
Key capabilities:
- Supports both STREAMING and BATCH execution modes
- Row-format encoding (e.g., CSV, plain text) and bulk-format encoding (e.g., Parquet, ORC)
- Configurable bucket assignment (date-time partitioning, base path, custom)
- Rolling policies for file rotation
- Optional file compaction to reduce small file proliferation
- Exactly-once semantics through two-phase commit
Usage
Execute this workflow when you need to write data from a Flink DataStream or Table pipeline to files on a local filesystem, HDFS, or S3-compatible storage. This is the standard approach when you need partitioned file output with exactly-once guarantees, replacing the deprecated StreamingFileSink API.
Execution Steps
Step 1: Configure File Sink Builder
Select the appropriate encoding format and initialize the FileSink builder. Choose between row-format encoding (for line-delimited text, CSV) or bulk-format encoding (for columnar formats like Parquet or ORC). The builder is constructed with the base output path and the chosen format factory.
Key considerations:
- Row format uses an Encoder interface for element-by-element serialization
- Bulk format uses a BulkWriter.Factory for batched columnar writes
- The base path determines the root directory for all output files
Step 2: Configure Bucket Assignment
Define how incoming elements are routed to output buckets (subdirectories). The bucket assigner examines each element and the current processing context to determine the target bucket identifier. Common strategies include date-time based partitioning (e.g., hourly or daily directories) and base-path assignment (all data to one directory).
Key considerations:
- DateTimeBucketAssigner creates time-based directory hierarchies
- BasePathBucketAssigner writes all output to the base path
- Custom BucketAssigner implementations can partition by any element attribute
- Bucket ID is a string representing the subdirectory path
Step 3: Configure Rolling Policy
Set the file rotation policy that determines when in-progress files should be closed and new ones started. Rolling can be triggered by file size thresholds, elapsed time since file creation, or on every checkpoint. The default policy combines size and time limits.
Key considerations:
- DefaultRollingPolicy supports maxPartSize, rolloverInterval, and inactivityInterval
- CheckpointRollingPolicy rolls files on every Flink checkpoint
- OnCheckpointRollingPolicy is the simplest option for bulk formats
- Bulk formats require checkpoint-based rolling (no mid-file splits possible)
Step 4: Write Data to Buckets
Elements flow through the FileWriter which routes each element to the appropriate FileWriterBucket based on the bucket assigner. Each bucket maintains an in-progress file that accumulates writes. The writer tracks output metrics and handles bucket lifecycle.
What happens:
- Each incoming element is evaluated by the BucketAssigner to determine its bucket ID
- If the target bucket does not exist, a new FileWriterBucket is created
- The element is written to the bucket's active in-progress file
- The rolling policy is consulted after each write to determine if the file should be rotated
Step 5: Prepare Commit (Checkpoint)
On each Flink checkpoint, the FileWriter prepares committable objects. All in-progress files that should roll (per rolling policy) are closed and converted to pending files. Each pending file is wrapped in a FileSinkCommittable that records the file path and metadata needed for the final commit.
What happens:
- In-progress files are closed and become pending files
- Pending file metadata is collected into FileSinkCommittable objects
- Cleanup committables are generated for any stale in-progress files
- Bucket state is serialized for checkpoint persistence
Step 6: Commit Files
The FileCommitter finalizes pending files by performing atomic renames from temporary locations to their final output paths. This completes the two-phase commit protocol, ensuring exactly-once delivery. Files transition from pending to finished state.
Key considerations:
- Commit operations use atomic file system renames when supported
- Failed commits are retried on recovery from checkpoint
- The committer handles both pending file commits and in-progress file cleanup
- After commit, files are visible to downstream consumers