Heuristic:Apache Flink Bin Packing Complexity Guard
| Knowledge Sources | |
|---|---|
| Domains | Optimization, File_Compaction |
| Last Updated | 2026-02-09 13:00 GMT |
Overview
Performance guard in the file compaction bin packing algorithm that caps the active bin count at 10 to prevent O(n^2) algorithmic complexity.
Description
The `BinPacking.pack()` method in the file compaction pipeline uses a first-fit-decreasing approach to group small files into compaction units that approximate a target file size. For each incoming file, the algorithm scans all open bins to find one with enough remaining capacity. Without a bound on the number of active bins, this linear scan across all bins for every new item creates O(n * m) complexity where n is items and m is bins. In the worst case (many items that don't fit existing bins), this degrades to O(n^2).
The heuristic caps the number of active bins at 10. When the bin count exceeds 10, the oldest bin is immediately committed (removed from the active set and added to the output), regardless of whether it is optimally packed. This guarantees constant-time bin searching per item in steady state.
Usage
This heuristic applies automatically within the file compaction pipeline whenever `FileCompactStrategy` triggers compaction and files need to be grouped into compaction requests. It is relevant when the number of small files per bucket is large (e.g., high-frequency checkpointing produces many small part files).
The Insight (Rule of Thumb)
- Action: When the number of active bins exceeds 10 during file packing, immediately commit the oldest bin.
- Value: Hard-coded threshold of 10 active bins.
- Trade-off: Suboptimal packing quality at scale (oldest bin may have remaining capacity) in exchange for guaranteed linear time complexity.
Reasoning
In the steady state of a streaming file sink with frequent checkpoints, the compaction coordinator may need to pack hundreds or thousands of small files per bucket. A pure first-fit algorithm with unbounded bins would spend O(n) time per item scanning all bins, leading to O(n^2) total time for n items. By capping active bins at 10, the per-item scan is bounded at 10 iterations, making the total algorithm O(n).
The value of 10 was chosen as a balance: it is large enough to provide reasonable packing quality (items have 10 candidate bins to fit into) while small enough to keep the scan cost negligible. In practice, the suboptimal packing from early bin eviction has minimal impact because the compaction target size means most bins fill up naturally within a few items.
Code Evidence
From `flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/BinPacking.java:34-58`:
public static <T> List<List<T>> pack(
Iterable<T> items, Function<T, Long> weightFunc, long targetWeight) {
List<List<T>> packed = new ArrayList<>();
Deque<Bin<T>> bins = new LinkedList<>();
for (T item : items) {
long weight = weightFunc.apply(item);
Bin<T> bin = findBin(bins, weight);
if (bin != null) {
bin.add(item, weight);
} else {
bin = new Bin<>(targetWeight);
bin.add(item, weight);
bins.addLast(bin);
// avoid n * n algorithm complexity
if (bins.size() > 10) {
packed.add(bins.removeFirst().items());
}
}
}
bins.forEach(bin -> packed.add(bin.items));
return packed;
}
The `findBin` method performs a linear scan from `BinPacking.java:61-68`:
private static <T> Bin<T> findBin(Iterable<Bin<T>> bins, long weight) {
for (Bin<T> bin : bins) {
if (bin.canAdd(weight)) {
return bin;
}
}
return null;
}