Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon IntervalPartition

From Leeroopedia


Knowledge Sources
Domains Algorithm, LSM Tree
Last Updated 2026-02-08 00:00 GMT

Overview

IntervalPartition partitions overlapping data files into the minimum number of non-overlapping SortedRuns for LSM tree merge operations.

Description

IntervalPartition implements an algorithm to organize a list of DataFileMeta objects into the minimum number of SortedRuns, where each SortedRun contains files with non-overlapping key ranges. This is a critical optimization for reading from LSM tree structures in primary key tables, as it minimizes the number of merge operations required during query execution.

The algorithm works in two phases. First, it sorts all files by their min_key and max_key values. Then it partitions files into sections where each section represents files whose key ranges overlap. Within each section, it uses a greedy algorithm with a min-heap to assign files to runs, ensuring that files in the same run have non-overlapping key ranges.

The min-heap-based approach ensures optimal packing: when a file's key range doesn't overlap with any existing run, it must start a new run. The algorithm selects the earliest-finishing run (the one with the smallest max_key) and attempts to append the next file to it. This minimizes the total number of runs needed while maintaining the non-overlapping property within each run.

Usage

Use IntervalPartition when reading from primary key tables that have multiple overlapping files at different LSM levels. The PrimaryKeyTableSplitGenerator uses this algorithm to organize files into efficient merge groups before creating splits, reducing merge overhead during query execution.

Code Reference

Source Location

Signature

@dataclass
class SortedRun:
    """
    A SortedRun is a list of files sorted by their keys.
    The key intervals [minKey, maxKey] of these files do not overlap.
    """
    files: List[DataFileMeta]


class IntervalPartition:
    """
    Algorithm to partition several data files into the minimum number of SortedRuns.
    """

    def __init__(self, input_files: List[DataFileMeta]):
        self.files = input_files.copy()
        self.key_comparator = default_key_comparator
        self.files.sort(key=cmp_to_key(self._compare_files))

    def partition(self) -> List[List[SortedRun]]:
        """
        Partition files into sections, then partition each section into SortedRuns.
        Returns a list of sections, where each section is a list of SortedRuns.
        """
        ...

    def _partition_section(self, metas: List[DataFileMeta]) -> List[SortedRun]:
        """Partition a section of overlapping files into minimum SortedRuns."""
        ...

    def _compare_files(self, f1: DataFileMeta, f2: DataFileMeta) -> int:
        """Compare files by min_key, then max_key."""
        ...

Import

from pypaimon.read.interval_partition import IntervalPartition, SortedRun

I/O Contract

Inputs

Name Type Required Description
input_files List[DataFileMeta] Yes List of data files with min_key and max_key metadata

Outputs

Name Type Description
sections List[List[SortedRun]] List of sections, each containing SortedRuns with non-overlapping key ranges

Usage Examples

from pypaimon.read.interval_partition import IntervalPartition

# Get overlapping files from multiple LSM levels
data_files = [
    file_from_level_0,  # min_key=1, max_key=100
    file_from_level_1,  # min_key=50, max_key=150
    file_from_level_1,  # min_key=120, max_key=200
    file_from_level_2   # min_key=180, max_key=250
]

# Partition files into optimal SortedRuns
partitioner = IntervalPartition(data_files)
sections = partitioner.partition()

# Each section contains files with overlapping ranges
for section_idx, section in enumerate(sections):
    print(f"Section {section_idx}: {len(section)} runs")

    # Each run contains files with non-overlapping ranges
    for run_idx, sorted_run in enumerate(section):
        print(f"  Run {run_idx}: {len(sorted_run.files)} files")
        for file in sorted_run.files:
            print(f"    File: key range [{file.min_key}, {file.max_key}]")

# Use the partitioned runs for efficient merge-on-read
for section in sections:
    readers = [create_reader_for_run(run) for run in section]
    merge_reader = SortMergeReader(readers, schema)
    # Process merged data

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment