Implementation:Apache Paimon SortMergeReader
| Knowledge Sources | |
|---|---|
| Domains | Data Reading, Primary Key Tables |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
SortMergeReaderWithMinHeap merges multiple sorted KeyValue streams using a min-heap to produce deduplicated results.
Description
SortMergeReaderWithMinHeap implements a RecordReader that merges multiple sorted input streams of KeyValue records using a min-heap-based algorithm. It is designed for primary key tables where multiple sorted files need to be merged while deduplicating records with the same key. The reader uses a DeduplicateMergeFunction to keep only the latest version of each key based on sequence numbers.
The implementation maintains a min-heap of elements, where each element contains a KeyValue record, its iterator, and its reader. The heap is ordered first by the key (using a comparator built from the primary key schema), and then by sequence number for keys that compare equal. This ensures that records are processed in key order and that the most recent version (highest sequence number) is selected for each key.
The reader handles partition keys by trimming them from the primary key comparison, as partition-level files already ensure partition key uniqueness. It processes data in batches through RecordIterators, automatically refilling the heap as iterators are exhausted and managing reader lifecycle.
Usage
Use SortMergeReaderWithMinHeap when reading from primary key tables where multiple sorted data files need to be merged. It is typically used in scan operations to combine data from different LSM tree levels while ensuring correct deduplication and ordering based on primary keys.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/reader/sort_merge_reader.py
Signature
class SortMergeReaderWithMinHeap(RecordReader):
"""SortMergeReader implemented with min-heap."""
def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema):
self.next_batch_readers = list(readers)
self.merge_function = DeduplicateMergeFunction()
...
def read_batch(self) -> Optional[RecordIterator]:
"""Read next batch of merged and deduplicated records."""
...
def close(self):
"""Close all readers and release resources."""
...
class DeduplicateMergeFunction:
"""A MergeFunction where key is primary key (unique) and value is the full record."""
def __init__(self):
self.latest_kv = None
def reset(self) -> None:
...
def add(self, kv: KeyValue):
...
def get_result(self) -> Optional[KeyValue]:
...
Import
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| readers | List[RecordReader[KeyValue]] | Yes | List of sorted KeyValue readers to merge |
| schema | TableSchema | Yes | Table schema containing primary key information |
Outputs
| Name | Type | Description |
|---|---|---|
| RecordIterator | Optional[RecordIterator] | Iterator over merged and deduplicated KeyValue records, or None when exhausted |
Usage Examples
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.schema.table_schema import TableSchema
# Create sorted readers for different LSM levels
readers = [
create_reader_for_level_0(),
create_reader_for_level_1(),
create_reader_for_level_2()
]
# Get table schema with primary keys
schema = table.schema()
# Create sort-merge reader
merge_reader = SortMergeReaderWithMinHeap(readers, schema)
# Read merged batches
while True:
batch_iterator = merge_reader.read_batch()
if batch_iterator is None:
break
# Iterate over records in batch
while True:
kv = batch_iterator.next()
if kv is None:
break
# Process deduplicated KeyValue
process_record(kv.key, kv.value)
merge_reader.close()