Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon PrimaryKeyTableSplitGenerator

From Leeroopedia


Knowledge Sources
Domains Split Generation, Primary Key Tables
Last Updated 2026-02-08 00:00 GMT

Overview

PrimaryKeyTableSplitGenerator creates data splits for primary key tables with LSM tree level awareness and merge optimization.

Description

PrimaryKeyTableSplitGenerator extends AbstractSplitGenerator to create splits specifically for primary key tables. Unlike append-only tables, primary key tables use LSM tree storage with multiple levels that may contain overlapping keys requiring merge operations during reads. The generator analyzes file characteristics to determine whether files can use optimized read paths or require full merge-on-read processing.

The generator supports several optimization strategies. Files at level 0 without deleted rows can be read directly without merging when deletion vectors are enabled or the merge engine is FIRST_ROW. For files at different levels that cannot use the fast path, the generator uses IntervalPartition to organize files into non-overlapping sorted runs, then packs these runs into splits that minimize merge overhead.

Unlike append-only tables, primary key tables only support bucket-based sharding through the with_shard method. Slice-based distribution is not supported because the merge-on-read process requires processing all overlapping files together within each bucket to maintain consistency.

Usage

Use PrimaryKeyTableSplitGenerator when scanning primary key tables. Configure parallel processing using with_shard to distribute buckets across workers. The generator automatically determines the optimal read strategy based on file levels, deletion status, and table configuration.

Code Reference

Source Location

Signature

class PrimaryKeyTableSplitGenerator(AbstractSplitGenerator):
    """
    Split generator for primary key tables.
    """

    def __init__(
            self,
            table,
            target_split_size: int,
            open_file_cost: int,
            deletion_files_map=None
    ):
        super().__init__(table, target_split_size, open_file_cost, deletion_files_map)
        self.deletion_vectors_enabled = table.options.deletion_vectors_enabled()
        self.merge_engine = table.options.merge_engine()

    def with_slice(self, start_pos: int, end_pos: int):
        """Primary key tables do not support slice-based sharding."""
        raise NotImplementedError(...)

    def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
        """Create splits with LSM-aware packing and merge optimization."""
        ...

    def _filter_by_shard(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]:
        """Filter file entries by bucket-based sharding."""
        ...

Import

from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator

I/O Contract

Inputs

Name Type Required Description
file_entries List[ManifestEntry] Yes List of manifest entries representing files to split
table Table Yes Primary key table instance
target_split_size int Yes Target size for each split in bytes
open_file_cost int Yes Cost penalty for opening a file
deletion_files_map dict No Map of deletion vector files

Outputs

Name Type Description
splits List[Split] List of Split objects with raw_convertible flag indicating read strategy

Usage Examples

from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator

# Create split generator for primary key table
generator = PrimaryKeyTableSplitGenerator(
    table=table,
    target_split_size=128 * 1024 * 1024,  # 128 MB
    open_file_cost=4 * 1024 * 1024,  # 4 MB
    deletion_files_map=deletion_vectors
)

# Configure bucket-based sharding (only supported mode)
generator.with_shard(idx_of_this_subtask=0, number_of_para_subtasks=4)

# Slice-based sharding raises NotImplementedError
# generator.with_slice(0, 1000)  # This will fail

# Get manifest entries from scan
manifest_entries = table.scan().files()

# Create splits with LSM optimization
splits = generator.create_splits(manifest_entries)

# Read splits
for split in splits:
    if split.raw_convertible:
        # Fast path: direct read without merge
        reader = table.create_raw_reader(split)
    else:
        # Slow path: merge-on-read
        reader = table.create_merge_reader(split)

    # Process data
    for batch in reader:
        process(batch)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment