Implementation:Apache Paimon SlicedSplit
| Knowledge Sources | |
|---|---|
| Domains | Split Generation, Parallel Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
SlicedSplit wraps a Split with file-level row range information to support fine-grained parallel processing through slicing.
Description
SlicedSplit is a wrapper around a regular Split that adds file-level slicing information through a shard_file_idx_map. This map specifies which row ranges should be read from each file in the split, enabling data distribution at a finer granularity than bucket-level sharding. Each entry in the map associates a file name with a tuple (start_row, end_row) indicating the inclusive start and exclusive end row indices to read from that file.
The class implements the Split interface by delegating most operations to the underlying data_split, but overrides row_count and merged_row_count to account for the sliced ranges. When calculating row counts, it sums only the rows within specified ranges rather than using the full file row counts. Files with (-1, -1) ranges are skipped entirely.
The merged_row_count calculation is particularly complex, handling cases with row IDs and deletion vectors. It groups files by overlapping row ID ranges and computes the maximum row count in each group, then adjusts for deletion vectors proportionally. This provides an accurate estimate of actual rows after deduplication and deletion in primary key tables.
Usage
SlicedSplit is created by AppendTableSplitGenerator when using slice-based parallel processing. It is transparent to readers, which check for file index maps and pass the ranges to the underlying file readers. This enables fine-grained parallelism in append-only tables where data can be divided by row position rather than bucket.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/sliced_split.py
Signature
class SlicedSplit(Split):
"""
Wrapper for Split that adds file-level slicing information.
Maps file_name -> (start_row, end_row) where:
- start_row: starting row index within the file (inclusive)
- end_row: ending row index within the file (exclusive)
- (-1, -1): file should be skipped entirely
"""
def __init__(
self,
data_split: 'Split',
shard_file_idx_map: Dict[str, Tuple[int, int]]
):
...
def data_split(self) -> 'Split':
"""Get the underlying data split."""
...
def shard_file_idx_map(self) -> Dict[str, Tuple[int, int]]:
"""Get the file index map with row ranges."""
...
@property
def row_count(self) -> int:
"""Calculate row count considering sliced ranges."""
...
def merged_row_count(self):
"""Calculate merged row count after deduplication and deletion."""
...
def _get_sliced_file_row_count(self, file: 'DataFileMeta') -> int:
"""Get row count for a file considering slicing."""
...
Import
from pypaimon.read.sliced_split import SlicedSplit
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data_split | Split | Yes | The underlying split being sliced |
| shard_file_idx_map | Dict[str, Tuple[int, int]] | Yes | Map from file name to (start_row, end_row) ranges |
Outputs
| Name | Type | Description |
|---|---|---|
| Split | SlicedSplit | A split that implements the Split interface with slicing awareness |
Usage Examples
from pypaimon.read.sliced_split import SlicedSplit
# Original split with 3 files
original_split = DataSplit(
files=[file1, file2, file3], # 1000 rows each
partition=partition,
bucket=0,
row_count=3000
)
# Create sliced split for worker 0 processing rows 0-1000
shard_file_idx_map = {
"file1.parquet": (0, 1000), # Read all rows from file1
"file2.parquet": (-1, -1), # Skip file2
"file3.parquet": (-1, -1) # Skip file3
}
sliced_split = SlicedSplit(original_split, shard_file_idx_map)
# Sliced properties
print(f"Row count: {sliced_split.row_count}") # 1000 (only file1)
print(f"Files: {len(sliced_split.files)}") # 3 (all files present)
# Another worker processes rows 1000-2500
worker1_map = {
"file1.parquet": (-1, -1), # Skip file1
"file2.parquet": (0, 1000), # Read all rows from file2
"file3.parquet": (0, 500) # Read first 500 rows from file3
}
worker1_split = SlicedSplit(original_split, worker1_map)
print(f"Worker 1 row count: {worker1_split.row_count}") # 1500
# Reader automatically respects the slicing
reader = table.create_reader(sliced_split)
for batch in reader:
# Only rows specified in shard_file_idx_map
process(batch)