Implementation:Apache Paimon PrimaryKeyTableSplitGenerator
| Knowledge Sources | |
|---|---|
| Domains | Split Generation, Primary Key Tables |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
PrimaryKeyTableSplitGenerator creates data splits for primary key tables with LSM tree level awareness and merge optimization.
Description
PrimaryKeyTableSplitGenerator extends AbstractSplitGenerator to create splits specifically for primary key tables. Unlike append-only tables, primary key tables use LSM tree storage with multiple levels that may contain overlapping keys requiring merge operations during reads. The generator analyzes file characteristics to determine whether files can use optimized read paths or require full merge-on-read processing.
The generator supports several optimization strategies. Files at level 0 without deleted rows can be read directly without merging when deletion vectors are enabled or the merge engine is FIRST_ROW. For files at different levels that cannot use the fast path, the generator uses IntervalPartition to organize files into non-overlapping sorted runs, then packs these runs into splits that minimize merge overhead.
Unlike append-only tables, primary key tables only support bucket-based sharding through the with_shard method. Slice-based distribution is not supported because the merge-on-read process requires processing all overlapping files together within each bucket to maintain consistency.
Usage
Use PrimaryKeyTableSplitGenerator when scanning primary key tables. Configure parallel processing using with_shard to distribute buckets across workers. The generator automatically determines the optimal read strategy based on file levels, deletion status, and table configuration.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/scanner/primary_key_table_split_generator.py
Signature
class PrimaryKeyTableSplitGenerator(AbstractSplitGenerator):
"""
Split generator for primary key tables.
"""
def __init__(
self,
table,
target_split_size: int,
open_file_cost: int,
deletion_files_map=None
):
super().__init__(table, target_split_size, open_file_cost, deletion_files_map)
self.deletion_vectors_enabled = table.options.deletion_vectors_enabled()
self.merge_engine = table.options.merge_engine()
def with_slice(self, start_pos: int, end_pos: int):
"""Primary key tables do not support slice-based sharding."""
raise NotImplementedError(...)
def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
"""Create splits with LSM-aware packing and merge optimization."""
...
def _filter_by_shard(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]:
"""Filter file entries by bucket-based sharding."""
...
Import
from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| file_entries | List[ManifestEntry] | Yes | List of manifest entries representing files to split |
| table | Table | Yes | Primary key table instance |
| target_split_size | int | Yes | Target size for each split in bytes |
| open_file_cost | int | Yes | Cost penalty for opening a file |
| deletion_files_map | dict | No | Map of deletion vector files |
Outputs
| Name | Type | Description |
|---|---|---|
| splits | List[Split] | List of Split objects with raw_convertible flag indicating read strategy |
Usage Examples
from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator
# Create split generator for primary key table
generator = PrimaryKeyTableSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024, # 128 MB
open_file_cost=4 * 1024 * 1024, # 4 MB
deletion_files_map=deletion_vectors
)
# Configure bucket-based sharding (only supported mode)
generator.with_shard(idx_of_this_subtask=0, number_of_para_subtasks=4)
# Slice-based sharding raises NotImplementedError
# generator.with_slice(0, 1000) # This will fail
# Get manifest entries from scan
manifest_entries = table.scan().files()
# Create splits with LSM optimization
splits = generator.create_splits(manifest_entries)
# Read splits
for split in splits:
if split.raw_convertible:
# Fast path: direct read without merge
reader = table.create_raw_reader(split)
else:
# Slow path: merge-on-read
reader = table.create_merge_reader(split)
# Process data
for batch in reader:
process(batch)