Implementation:Apache Paimon IntervalPartition
| Knowledge Sources | |
|---|---|
| Domains | Algorithm, LSM Tree |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
IntervalPartition partitions overlapping data files into the minimum number of non-overlapping SortedRuns for LSM tree merge operations.
Description
IntervalPartition implements an algorithm to organize a list of DataFileMeta objects into the minimum number of SortedRuns, where each SortedRun contains files with non-overlapping key ranges. This is a critical optimization for reading from LSM tree structures in primary key tables, as it minimizes the number of merge operations required during query execution.
The algorithm works in two phases. First, it sorts all files by their min_key and max_key values. Then it partitions files into sections where each section represents files whose key ranges overlap. Within each section, it uses a greedy algorithm with a min-heap to assign files to runs, ensuring that files in the same run have non-overlapping key ranges.
The min-heap-based approach ensures optimal packing: when a file's key range doesn't overlap with any existing run, it must start a new run. The algorithm selects the earliest-finishing run (the one with the smallest max_key) and attempts to append the next file to it. This minimizes the total number of runs needed while maintaining the non-overlapping property within each run.
Usage
Use IntervalPartition when reading from primary key tables that have multiple overlapping files at different LSM levels. The PrimaryKeyTableSplitGenerator uses this algorithm to organize files into efficient merge groups before creating splits, reducing merge overhead during query execution.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/interval_partition.py
Signature
@dataclass
class SortedRun:
"""
A SortedRun is a list of files sorted by their keys.
The key intervals [minKey, maxKey] of these files do not overlap.
"""
files: List[DataFileMeta]
class IntervalPartition:
"""
Algorithm to partition several data files into the minimum number of SortedRuns.
"""
def __init__(self, input_files: List[DataFileMeta]):
self.files = input_files.copy()
self.key_comparator = default_key_comparator
self.files.sort(key=cmp_to_key(self._compare_files))
def partition(self) -> List[List[SortedRun]]:
"""
Partition files into sections, then partition each section into SortedRuns.
Returns a list of sections, where each section is a list of SortedRuns.
"""
...
def _partition_section(self, metas: List[DataFileMeta]) -> List[SortedRun]:
"""Partition a section of overlapping files into minimum SortedRuns."""
...
def _compare_files(self, f1: DataFileMeta, f2: DataFileMeta) -> int:
"""Compare files by min_key, then max_key."""
...
Import
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| input_files | List[DataFileMeta] | Yes | List of data files with min_key and max_key metadata |
Outputs
| Name | Type | Description |
|---|---|---|
| sections | List[List[SortedRun]] | List of sections, each containing SortedRuns with non-overlapping key ranges |
Usage Examples
from pypaimon.read.interval_partition import IntervalPartition
# Get overlapping files from multiple LSM levels
data_files = [
file_from_level_0, # min_key=1, max_key=100
file_from_level_1, # min_key=50, max_key=150
file_from_level_1, # min_key=120, max_key=200
file_from_level_2 # min_key=180, max_key=250
]
# Partition files into optimal SortedRuns
partitioner = IntervalPartition(data_files)
sections = partitioner.partition()
# Each section contains files with overlapping ranges
for section_idx, section in enumerate(sections):
print(f"Section {section_idx}: {len(section)} runs")
# Each run contains files with non-overlapping ranges
for run_idx, sorted_run in enumerate(section):
print(f" Run {run_idx}: {len(sorted_run.files)} files")
for file in sorted_run.files:
print(f" File: key range [{file.min_key}, {file.max_key}]")
# Use the partitioned runs for efficient merge-on-read
for section in sections:
readers = [create_reader_for_run(run) for run in section]
merge_reader = SortMergeReader(readers, schema)
# Process merged data