Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:Apache Flink Bin Packing Complexity Guard

From Leeroopedia




Knowledge Sources
Domains Optimization, File_Compaction
Last Updated 2026-02-09 13:00 GMT

Overview

Performance guard in the file compaction bin packing algorithm that caps the active bin count at 10 to prevent O(n^2) algorithmic complexity.

Description

The `BinPacking.pack()` method in the file compaction pipeline uses a first-fit-decreasing approach to group small files into compaction units that approximate a target file size. For each incoming file, the algorithm scans all open bins to find one with enough remaining capacity. Without a bound on the number of active bins, this linear scan across all bins for every new item creates O(n * m) complexity where n is items and m is bins. In the worst case (many items that don't fit existing bins), this degrades to O(n^2).

The heuristic caps the number of active bins at 10. When the bin count exceeds 10, the oldest bin is immediately committed (removed from the active set and added to the output), regardless of whether it is optimally packed. This guarantees constant-time bin searching per item in steady state.

Usage

This heuristic applies automatically within the file compaction pipeline whenever `FileCompactStrategy` triggers compaction and files need to be grouped into compaction requests. It is relevant when the number of small files per bucket is large (e.g., high-frequency checkpointing produces many small part files).

The Insight (Rule of Thumb)

  • Action: When the number of active bins exceeds 10 during file packing, immediately commit the oldest bin.
  • Value: Hard-coded threshold of 10 active bins.
  • Trade-off: Suboptimal packing quality at scale (oldest bin may have remaining capacity) in exchange for guaranteed linear time complexity.

Reasoning

In the steady state of a streaming file sink with frequent checkpoints, the compaction coordinator may need to pack hundreds or thousands of small files per bucket. A pure first-fit algorithm with unbounded bins would spend O(n) time per item scanning all bins, leading to O(n^2) total time for n items. By capping active bins at 10, the per-item scan is bounded at 10 iterations, making the total algorithm O(n).

The value of 10 was chosen as a balance: it is large enough to provide reasonable packing quality (items have 10 candidate bins to fit into) while small enough to keep the scan cost negligible. In practice, the suboptimal packing from early bin eviction has minimal impact because the compaction target size means most bins fill up naturally within a few items.

Code Evidence

From `flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/BinPacking.java:34-58`:

public static <T> List<List<T>> pack(
        Iterable<T> items, Function<T, Long> weightFunc, long targetWeight) {
    List<List<T>> packed = new ArrayList<>();
    Deque<Bin<T>> bins = new LinkedList<>();

    for (T item : items) {
        long weight = weightFunc.apply(item);
        Bin<T> bin = findBin(bins, weight);

        if (bin != null) {
            bin.add(item, weight);
        } else {
            bin = new Bin<>(targetWeight);
            bin.add(item, weight);
            bins.addLast(bin);

            // avoid n * n algorithm complexity
            if (bins.size() > 10) {
                packed.add(bins.removeFirst().items());
            }
        }
    }

    bins.forEach(bin -> packed.add(bin.items));
    return packed;
}

The `findBin` method performs a linear scan from `BinPacking.java:61-68`:

private static <T> Bin<T> findBin(Iterable<Bin<T>> bins, long weight) {
    for (Bin<T> bin : bins) {
        if (bin.canAdd(weight)) {
            return bin;
        }
    }
    return null;
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment