Implementation:Apache Paimon SplitRead
| Knowledge Sources | |
|---|---|
| Domains | Data Reading, Pipeline Architecture |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
SplitRead implements the split-level read logic that converts a Split into a RecordReader, selecting the appropriate read strategy (raw file read, merge read, or data evolution read) and assembling the complete reader pipeline with format readers, field mapping, partition injection, and filtering.
Description
The `SplitRead` abstract base class handles cross-cutting concerns: file format dispatch (Avro, Blob, Lance, Parquet/ORC) via `file_reader_supplier()`, field/index mapping for schema evolution via `create_index_mapping()`, partition value injection via `_create_partition_info()`, and predicate push-down filtering. Three concrete implementations provide distinct read strategies. `RawFileSplitRead` creates readers for append-only or raw-convertible primary key splits: it uses `ShardBatchReader` for sliced splits (applying start/end row positions), `ConcatBatchReader` to concatenate file readers sequentially, optional `ApplyDeletionVectorReader` for deletion vector filtering, and `FilterRecordReader` for predicate evaluation on primary key tables. `MergeFileSplitRead` builds the primary key merge pipeline: `KeyValueWrapReader` wraps each file reader to emit key-value pairs, `ConcatRecordReader` chains readers per sorted run, `SortMergeReaderWithMinHeap` merges sorted runs using a min-heap, sections are concatenated, `DropDeleteRecordReader` filters tombstones, `KeyValueUnwrapRecordReader` extracts values, and optional `FilterRecordReader` applies remaining predicates. `DataEvolutionSplitRead` handles multi-file field merging: `_split_by_row_id()` groups files by first_row_id, `_split_field_bunches()` creates `DataBunch`/`BlobBunch` objects per field, `DataEvolutionMergeReader` merges fields using row/field offset arrays, and `RowIdFilterRecordBatchReader` applies indexed split row ranges. Field mapping handles both trimmed keys (for primary key merging) and base field projection. Index mapping supports NULL_FIELD_INDEX=-1 for missing fields during schema evolution.
This architecture makes SplitRead the most significant read module, as it assembles the entire reader pipeline from format readers through to final record output, bridging split planning (what to read) and actual data reading (how to read it).
Usage
SplitRead implementations are instantiated by the table scanner based on table type and split characteristics, typically not used directly by applications.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/split_read.py
Signature
class SplitRead(ABC):
def __init__(self, table, predicate: Optional[Predicate],
read_type: List[DataField], split: Split,
row_tracking_enabled: bool): ...
@abstractmethod
def create_reader(self) -> RecordReader: ...
def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
read_fields: List[str],
row_tracking_enabled: bool) -> RecordBatchReader: ...
def create_index_mapping(self): ...
class RawFileSplitRead(SplitRead):
def create_reader(self) -> RecordReader: ...
def raw_reader_supplier(self, file: DataFileMeta,
dv_factory: Optional[Callable] = None) -> Optional[RecordReader]: ...
class MergeFileSplitRead(SplitRead):
def create_reader(self) -> RecordReader: ...
def kv_reader_supplier(self, file: DataFileMeta,
dv_factory: Optional[Callable] = None) -> RecordReader: ...
def section_reader_supplier(self, section: List[SortedRun]) -> RecordReader: ...
class DataEvolutionSplitRead(SplitRead):
def create_reader(self) -> RecordReader: ...
def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: ...
def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordReader: ...
def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: ...
Import
from pypaimon.read.split_read import RawFileSplitRead, MergeFileSplitRead, DataEvolutionSplitRead
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table | FileStoreTable | yes | Table being read |
| split | Split | yes | Split to read (DataSplit, IndexedSplit, or SlicedSplit) |
| predicate | Predicate | no | Filter predicate |
| read_type | List[DataField] | yes | Fields to read |
| row_tracking_enabled | bool | yes | Whether row tracking is enabled |
Outputs
| Name | Type | Description |
|---|---|---|
| reader | RecordReader | Fully assembled reader pipeline |
Usage Examples
Append-Only Table Read
from pypaimon.read.split_read import RawFileSplitRead
# Create reader for append-only table
split_read = RawFileSplitRead(
table=table,
predicate=predicate,
read_type=table.fields,
split=data_split,
row_tracking_enabled=False
)
# Create reader pipeline
reader = split_read.create_reader()
# Read data
for batch in reader:
print(f"Read {batch.num_rows} rows")
Primary Key Table Merge Read
from pypaimon.read.split_read import MergeFileSplitRead
# Create merge reader for primary key table
split_read = MergeFileSplitRead(
table=pk_table,
predicate=predicate,
read_type=pk_table.fields,
split=data_split,
row_tracking_enabled=False
)
# Reader pipeline includes merge logic
reader = split_read.create_reader()
# Read merged data (latest version per key)
for batch in reader:
# Only latest versions are returned
print(f"Read {batch.num_rows} unique keys")
Data Evolution Read
from pypaimon.read.split_read import DataEvolutionSplitRead
from pypaimon.globalindex.indexed_split import IndexedSplit
# Create reader for data evolution table
indexed_split = IndexedSplit(
data_split=data_split,
row_ranges=[Range(100, 500)],
scores=[0.95, 0.92, 0.88, ...]
)
split_read = DataEvolutionSplitRead(
table=evolution_table,
predicate=predicate,
read_type=evolution_table.fields,
split=indexed_split,
row_tracking_enabled=True
)
# Reader handles field merging and row filtering
reader = split_read.create_reader()
for batch in reader:
# Fields merged from data + blob files
# Rows filtered by row ranges
print(f"Read {batch.num_rows} rows")
With Deletion Vectors
# Split with deletion files
data_split.data_deletion_files = [dv_file1, dv_file2, ...]
split_read = RawFileSplitRead(
table=table,
predicate=None,
read_type=table.fields,
split=data_split,
row_tracking_enabled=True
)
# Reader automatically applies deletion vectors
reader = split_read.create_reader()
# Deleted rows are filtered out