Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Workflow:Apache Flink Stream File Compaction

From Leeroopedia



Knowledge Sources
Domains Data_Engineering, Stream_Processing, File_IO, Storage_Optimization
Last Updated 2026-02-09 13:00 GMT

Overview

End-to-end process for merging small output files produced by Flink's FileSink into larger files using the coordinator-operator compaction pipeline in streaming mode.

Description

This workflow describes the streaming file compaction pipeline that addresses the small file problem inherent in streaming file sinks. When a streaming job checkpoints frequently or has high parallelism, the FileSink can produce many small files that degrade downstream read performance and increase filesystem metadata overhead. The compaction pipeline inserts a CompactCoordinator and CompactorOperator between the file writer and the committer. The coordinator collects pending files per bucket and triggers compaction when size or checkpoint thresholds are reached. The operator performs the actual file merging asynchronously, producing larger consolidated files that replace the original small files.

Key capabilities:

  • Size-based and checkpoint-count-based compaction triggering
  • Per-bucket compaction coordination
  • Asynchronous file merging via CompactService
  • Multiple compaction strategies (concat, record-wise, identical)
  • Seamless integration with FileSink's two-phase commit protocol
  • State persistence for recovery across checkpoints

Usage

Execute this workflow when your Flink streaming job produces many small output files and downstream consumers (e.g., Hive, Spark, Presto) suffer from degraded read performance due to high file counts. Enable compaction when checkpoint intervals are short (seconds to minutes), parallelism is high, or data rates are bursty, all of which can lead to proliferation of small part files.

Execution Steps

Step 1: Enable Compaction in FileSink

Configure the FileSink builder with a FileCompactStrategy that defines when compaction should trigger. The strategy is configured with size thresholds (target compacted file size) and checkpoint count thresholds (number of checkpoints between compactions). A FileCompactor implementation is also selected to define how files are merged.

Key considerations:

  • FileCompactStrategy.Builder configures sizeThreshold and numCheckpointsBeforeCompaction
  • ConcatFileCompactor concatenates file bytes (for simple formats)
  • RecordWiseFileCompactor reads and re-writes records (for structured formats)
  • IdenticalFileCompactor copies files without transformation
  • The compaction topology is automatically inserted into the sink pipeline

Step 2: Collect Committables in Coordinator

The CompactCoordinator (running at parallelism 1) receives FileSinkCommittable objects from the upstream FileWriter. It groups committables by bucket ID and maintains a per-bucket CompactTrigger that tracks accumulated file sizes and checkpoint counts. Cleanup committables pass through without compaction.

What happens:

  • Committables arrive as CommittableWithLineage messages
  • Each committable is routed to its bucket's trigger
  • The trigger accumulates pending file sizes
  • Cleanup committables (for stale in-progress files) are forwarded immediately
  • Coordinator state is checkpointed for recovery

Step 3: Trigger Compaction Requests

When a bucket's CompactTrigger fires (size threshold reached or sufficient checkpoints elapsed), the coordinator emits a CompactorRequest containing all pending files for that bucket. The trigger evaluates on each new committable and at each checkpoint notification. Fired requests clear the bucket's accumulated state.

Key considerations:

  • FIRE_AND_PURGE semantics: all accumulated files are included and the trigger resets
  • Size-based triggering compacts when total pending bytes exceed the threshold
  • Checkpoint-based triggering compacts after N checkpoints with pending files
  • End-of-input forces compaction of all remaining pending files
  • Requests are serialized via CompactorRequestSerializer for state persistence

Step 4: Execute Compaction Asynchronously

The CompactorOperator receives CompactorRequest objects and buffers them per checkpoint ID. When checkpoint preparation is triggered, buffered requests are submitted to the CompactService which executes the configured FileCompactor asynchronously. The service reads input files, merges them according to the compaction strategy, and produces compacted output files.

What happens:

  • Requests are buffered in a TreeMap keyed by checkpoint ID
  • On prepareSnapshotPreBarrier, requests are submitted to CompactService
  • CompactService runs compaction in a thread pool
  • ConcatFileCompactor concatenates raw file streams
  • RecordWiseFileCompactor reads records via a Reader and writes to a new file
  • Compacted files replace the original pending files in the committable output

Step 5: Emit Compacted Committables

After compaction completes, the CompactorOperator emits new CommittableMessage records containing the compacted file committables. These flow to the downstream FileCommitter which performs the final atomic rename to make files visible. The original small files are replaced by fewer, larger compacted files.

Key considerations:

  • Compacted committables follow the same commit protocol as non-compacted files
  • CommittableSummary tracks the total number of committables per checkpoint
  • CommittableWithLineage wraps each compacted file committable
  • The FileCommitter handles both compacted and passthrough committables identically
  • Remaining requests from incomplete checkpoints are persisted in operator state

Execution Diagram

GitHub URL

Workflow Repository