Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon SortMergeReader

From Leeroopedia


Knowledge Sources
Domains Data Reading, Primary Key Tables
Last Updated 2026-02-08 00:00 GMT

Overview

SortMergeReaderWithMinHeap merges multiple sorted KeyValue streams using a min-heap to produce deduplicated results.

Description

SortMergeReaderWithMinHeap implements a RecordReader that merges multiple sorted input streams of KeyValue records using a min-heap-based algorithm. It is designed for primary key tables where multiple sorted files need to be merged while deduplicating records with the same key. The reader uses a DeduplicateMergeFunction to keep only the latest version of each key based on sequence numbers.

The implementation maintains a min-heap of elements, where each element contains a KeyValue record, its iterator, and its reader. The heap is ordered first by the key (using a comparator built from the primary key schema), and then by sequence number for keys that compare equal. This ensures that records are processed in key order and that the most recent version (highest sequence number) is selected for each key.

The reader handles partition keys by trimming them from the primary key comparison, as partition-level files already ensure partition key uniqueness. It processes data in batches through RecordIterators, automatically refilling the heap as iterators are exhausted and managing reader lifecycle.

Usage

Use SortMergeReaderWithMinHeap when reading from primary key tables where multiple sorted data files need to be merged. It is typically used in scan operations to combine data from different LSM tree levels while ensuring correct deduplication and ordering based on primary keys.

Code Reference

Source Location

Signature

class SortMergeReaderWithMinHeap(RecordReader):
    """SortMergeReader implemented with min-heap."""

    def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema):
        self.next_batch_readers = list(readers)
        self.merge_function = DeduplicateMergeFunction()
        ...

    def read_batch(self) -> Optional[RecordIterator]:
        """Read next batch of merged and deduplicated records."""
        ...

    def close(self):
        """Close all readers and release resources."""
        ...


class DeduplicateMergeFunction:
    """A MergeFunction where key is primary key (unique) and value is the full record."""

    def __init__(self):
        self.latest_kv = None

    def reset(self) -> None:
        ...

    def add(self, kv: KeyValue):
        ...

    def get_result(self) -> Optional[KeyValue]:
        ...

Import

from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap

I/O Contract

Inputs

Name Type Required Description
readers List[RecordReader[KeyValue]] Yes List of sorted KeyValue readers to merge
schema TableSchema Yes Table schema containing primary key information

Outputs

Name Type Description
RecordIterator Optional[RecordIterator] Iterator over merged and deduplicated KeyValue records, or None when exhausted

Usage Examples

from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.schema.table_schema import TableSchema

# Create sorted readers for different LSM levels
readers = [
    create_reader_for_level_0(),
    create_reader_for_level_1(),
    create_reader_for_level_2()
]

# Get table schema with primary keys
schema = table.schema()

# Create sort-merge reader
merge_reader = SortMergeReaderWithMinHeap(readers, schema)

# Read merged batches
while True:
    batch_iterator = merge_reader.read_batch()
    if batch_iterator is None:
        break

    # Iterate over records in batch
    while True:
        kv = batch_iterator.next()
        if kv is None:
            break
        # Process deduplicated KeyValue
        process_record(kv.key, kv.value)

merge_reader.close()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment