Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon SlicedSplit

From Leeroopedia


Knowledge Sources
Domains Split Generation, Parallel Processing
Last Updated 2026-02-08 00:00 GMT

Overview

SlicedSplit wraps a Split with file-level row range information to support fine-grained parallel processing through slicing.

Description

SlicedSplit is a wrapper around a regular Split that adds file-level slicing information through a shard_file_idx_map. This map specifies which row ranges should be read from each file in the split, enabling data distribution at a finer granularity than bucket-level sharding. Each entry in the map associates a file name with a tuple (start_row, end_row) indicating the inclusive start and exclusive end row indices to read from that file.

The class implements the Split interface by delegating most operations to the underlying data_split, but overrides row_count and merged_row_count to account for the sliced ranges. When calculating row counts, it sums only the rows within specified ranges rather than using the full file row counts. Files with (-1, -1) ranges are skipped entirely.

The merged_row_count calculation is particularly complex, handling cases with row IDs and deletion vectors. It groups files by overlapping row ID ranges and computes the maximum row count in each group, then adjusts for deletion vectors proportionally. This provides an accurate estimate of actual rows after deduplication and deletion in primary key tables.

Usage

SlicedSplit is created by AppendTableSplitGenerator when using slice-based parallel processing. It is transparent to readers, which check for file index maps and pass the ranges to the underlying file readers. This enables fine-grained parallelism in append-only tables where data can be divided by row position rather than bucket.

Code Reference

Source Location

Signature

class SlicedSplit(Split):
    """
    Wrapper for Split that adds file-level slicing information.

    Maps file_name -> (start_row, end_row) where:
    - start_row: starting row index within the file (inclusive)
    - end_row: ending row index within the file (exclusive)
    - (-1, -1): file should be skipped entirely
    """

    def __init__(
        self,
        data_split: 'Split',
        shard_file_idx_map: Dict[str, Tuple[int, int]]
    ):
        ...

    def data_split(self) -> 'Split':
        """Get the underlying data split."""
        ...

    def shard_file_idx_map(self) -> Dict[str, Tuple[int, int]]:
        """Get the file index map with row ranges."""
        ...

    @property
    def row_count(self) -> int:
        """Calculate row count considering sliced ranges."""
        ...

    def merged_row_count(self):
        """Calculate merged row count after deduplication and deletion."""
        ...

    def _get_sliced_file_row_count(self, file: 'DataFileMeta') -> int:
        """Get row count for a file considering slicing."""
        ...

Import

from pypaimon.read.sliced_split import SlicedSplit

I/O Contract

Inputs

Name Type Required Description
data_split Split Yes The underlying split being sliced
shard_file_idx_map Dict[str, Tuple[int, int]] Yes Map from file name to (start_row, end_row) ranges

Outputs

Name Type Description
Split SlicedSplit A split that implements the Split interface with slicing awareness

Usage Examples

from pypaimon.read.sliced_split import SlicedSplit

# Original split with 3 files
original_split = DataSplit(
    files=[file1, file2, file3],  # 1000 rows each
    partition=partition,
    bucket=0,
    row_count=3000
)

# Create sliced split for worker 0 processing rows 0-1000
shard_file_idx_map = {
    "file1.parquet": (0, 1000),    # Read all rows from file1
    "file2.parquet": (-1, -1),     # Skip file2
    "file3.parquet": (-1, -1)      # Skip file3
}

sliced_split = SlicedSplit(original_split, shard_file_idx_map)

# Sliced properties
print(f"Row count: {sliced_split.row_count}")  # 1000 (only file1)
print(f"Files: {len(sliced_split.files)}")      # 3 (all files present)

# Another worker processes rows 1000-2500
worker1_map = {
    "file1.parquet": (-1, -1),     # Skip file1
    "file2.parquet": (0, 1000),    # Read all rows from file2
    "file3.parquet": (0, 500)      # Read first 500 rows from file3
}

worker1_split = SlicedSplit(original_split, worker1_map)
print(f"Worker 1 row count: {worker1_split.row_count}")  # 1500

# Reader automatically respects the slicing
reader = table.create_reader(sliced_split)
for batch in reader:
    # Only rows specified in shard_file_idx_map
    process(batch)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment