Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon SplitRead

From Leeroopedia


Knowledge Sources
Domains Data Reading, Pipeline Architecture
Last Updated 2026-02-08 00:00 GMT

Overview

SplitRead implements the split-level read logic that converts a Split into a RecordReader, selecting the appropriate read strategy (raw file read, merge read, or data evolution read) and assembling the complete reader pipeline with format readers, field mapping, partition injection, and filtering.

Description

The `SplitRead` abstract base class handles cross-cutting concerns: file format dispatch (Avro, Blob, Lance, Parquet/ORC) via `file_reader_supplier()`, field/index mapping for schema evolution via `create_index_mapping()`, partition value injection via `_create_partition_info()`, and predicate push-down filtering. Three concrete implementations provide distinct read strategies. `RawFileSplitRead` creates readers for append-only or raw-convertible primary key splits: it uses `ShardBatchReader` for sliced splits (applying start/end row positions), `ConcatBatchReader` to concatenate file readers sequentially, optional `ApplyDeletionVectorReader` for deletion vector filtering, and `FilterRecordReader` for predicate evaluation on primary key tables. `MergeFileSplitRead` builds the primary key merge pipeline: `KeyValueWrapReader` wraps each file reader to emit key-value pairs, `ConcatRecordReader` chains readers per sorted run, `SortMergeReaderWithMinHeap` merges sorted runs using a min-heap, sections are concatenated, `DropDeleteRecordReader` filters tombstones, `KeyValueUnwrapRecordReader` extracts values, and optional `FilterRecordReader` applies remaining predicates. `DataEvolutionSplitRead` handles multi-file field merging: `_split_by_row_id()` groups files by first_row_id, `_split_field_bunches()` creates `DataBunch`/`BlobBunch` objects per field, `DataEvolutionMergeReader` merges fields using row/field offset arrays, and `RowIdFilterRecordBatchReader` applies indexed split row ranges. Field mapping handles both trimmed keys (for primary key merging) and base field projection. Index mapping supports NULL_FIELD_INDEX=-1 for missing fields during schema evolution.

This architecture makes SplitRead the most significant read module, as it assembles the entire reader pipeline from format readers through to final record output, bridging split planning (what to read) and actual data reading (how to read it).

Usage

SplitRead implementations are instantiated by the table scanner based on table type and split characteristics, typically not used directly by applications.

Code Reference

Source Location

Signature

class SplitRead(ABC):
    def __init__(self, table, predicate: Optional[Predicate],
                 read_type: List[DataField], split: Split,
                 row_tracking_enabled: bool): ...

    @abstractmethod
    def create_reader(self) -> RecordReader: ...

    def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
                             read_fields: List[str],
                             row_tracking_enabled: bool) -> RecordBatchReader: ...

    def create_index_mapping(self): ...

class RawFileSplitRead(SplitRead):
    def create_reader(self) -> RecordReader: ...
    def raw_reader_supplier(self, file: DataFileMeta,
                            dv_factory: Optional[Callable] = None) -> Optional[RecordReader]: ...

class MergeFileSplitRead(SplitRead):
    def create_reader(self) -> RecordReader: ...
    def kv_reader_supplier(self, file: DataFileMeta,
                           dv_factory: Optional[Callable] = None) -> RecordReader: ...
    def section_reader_supplier(self, section: List[SortedRun]) -> RecordReader: ...

class DataEvolutionSplitRead(SplitRead):
    def create_reader(self) -> RecordReader: ...
    def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: ...
    def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordReader: ...
    def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: ...

Import

from pypaimon.read.split_read import RawFileSplitRead, MergeFileSplitRead, DataEvolutionSplitRead

I/O Contract

Inputs

Name Type Required Description
table FileStoreTable yes Table being read
split Split yes Split to read (DataSplit, IndexedSplit, or SlicedSplit)
predicate Predicate no Filter predicate
read_type List[DataField] yes Fields to read
row_tracking_enabled bool yes Whether row tracking is enabled

Outputs

Name Type Description
reader RecordReader Fully assembled reader pipeline

Usage Examples

Append-Only Table Read

from pypaimon.read.split_read import RawFileSplitRead

# Create reader for append-only table
split_read = RawFileSplitRead(
    table=table,
    predicate=predicate,
    read_type=table.fields,
    split=data_split,
    row_tracking_enabled=False
)

# Create reader pipeline
reader = split_read.create_reader()

# Read data
for batch in reader:
    print(f"Read {batch.num_rows} rows")

Primary Key Table Merge Read

from pypaimon.read.split_read import MergeFileSplitRead

# Create merge reader for primary key table
split_read = MergeFileSplitRead(
    table=pk_table,
    predicate=predicate,
    read_type=pk_table.fields,
    split=data_split,
    row_tracking_enabled=False
)

# Reader pipeline includes merge logic
reader = split_read.create_reader()

# Read merged data (latest version per key)
for batch in reader:
    # Only latest versions are returned
    print(f"Read {batch.num_rows} unique keys")

Data Evolution Read

from pypaimon.read.split_read import DataEvolutionSplitRead
from pypaimon.globalindex.indexed_split import IndexedSplit

# Create reader for data evolution table
indexed_split = IndexedSplit(
    data_split=data_split,
    row_ranges=[Range(100, 500)],
    scores=[0.95, 0.92, 0.88, ...]
)

split_read = DataEvolutionSplitRead(
    table=evolution_table,
    predicate=predicate,
    read_type=evolution_table.fields,
    split=indexed_split,
    row_tracking_enabled=True
)

# Reader handles field merging and row filtering
reader = split_read.create_reader()

for batch in reader:
    # Fields merged from data + blob files
    # Rows filtered by row ranges
    print(f"Read {batch.num_rows} rows")

With Deletion Vectors

# Split with deletion files
data_split.data_deletion_files = [dv_file1, dv_file2, ...]

split_read = RawFileSplitRead(
    table=table,
    predicate=None,
    read_type=table.fields,
    split=data_split,
    row_tracking_enabled=True
)

# Reader automatically applies deletion vectors
reader = split_read.create_reader()
# Deleted rows are filtered out

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment