Implementation:Apache Paimon SplitGenerator
| Knowledge Sources | |
|---|---|
| Domains | Split Generation, Query Planning |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
AbstractSplitGenerator is the base class for generating data splits from manifest entries with support for parallel processing.
Description
AbstractSplitGenerator provides the foundational logic for converting manifest entries (file metadata) into Split objects that can be read by workers. It implements common functionality for file packing, deletion vector handling, and parallel processing configuration that is shared across different table types. The abstract class defines the contract through the create_splits method that subclasses must implement.
The generator supports two modes of parallel processing. The with_shard method enables bucket-based distribution where splits are assigned based on bucket IDs, suitable for primary key tables. The with_slice method enables row-range-based distribution where data is divided by row positions, suitable for append-only tables. These modes are mutually exclusive and cannot be used simultaneously.
The implementation includes sophisticated bin-packing logic to group files into splits approaching a target size. It uses weight functions (file size plus open cost) to balance I/O efficiency and considers deletion vectors when building splits. The generator also provides utilities for computing shard ranges, file ranges, and handling blob files in data evolution scenarios.
Usage
AbstractSplitGenerator is not used directly but serves as the parent class for AppendTableSplitGenerator and PrimaryKeyTableSplitGenerator. Users interact with concrete implementations through the table scan API, which automatically selects the appropriate generator based on table type.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/scanner/split_generator.py
Signature
class AbstractSplitGenerator(ABC):
"""
Abstract base class for generating splits.
"""
NEXT_POS_KEY = '_next_pos'
def __init__(
self,
table,
target_split_size: int,
open_file_cost: int,
deletion_files_map: Optional[Dict] = None
):
...
def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: int):
"""Configure bucket-based sharding for parallel processing."""
...
def with_slice(self, start_pos: int, end_pos: int):
"""Configure row-range slicing for parallel processing."""
...
@abstractmethod
def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
"""Create splits from manifest entries."""
pass
def _build_split_from_pack(
self,
packed_files: List[List[DataFileMeta]],
file_entries: List[ManifestEntry],
for_primary_key_split: bool,
use_optimized_path: bool = False
) -> List[Split]:
"""Build splits from packed files."""
...
@staticmethod
def _pack_for_ordered(
items: List,
weight_func: Callable,
target_weight: int
) -> List[List]:
"""Pack items into bins approaching target weight."""
...
def _compute_shard_range(self, total_row: int) -> Tuple[int, int]:
"""Calculate start and end positions for this shard."""
...
@staticmethod
def _compute_file_range(
plan_start_pos: int,
plan_end_pos: int,
file_begin_pos: int,
file_row_count: int
) -> Optional[Tuple[int, int]]:
"""Compute the row range to read from a file."""
...
Import
from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table | Table | Yes | Table instance |
| target_split_size | int | Yes | Target size for splits in bytes |
| open_file_cost | int | Yes | Cost penalty for opening files |
| deletion_files_map | Optional[Dict] | No | Map of deletion vector files |
| file_entries | List[ManifestEntry] | Yes (create_splits) | Files to split |
Outputs
| Name | Type | Description |
|---|---|---|
| splits | List[Split] | Generated splits ready for reading |
Usage Examples
from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator
# Concrete generator is selected based on table type
if table.is_append_only():
generator = AppendTableSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024,
open_file_cost=4 * 1024 * 1024
)
else:
generator = PrimaryKeyTableSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024,
open_file_cost=4 * 1024 * 1024,
deletion_files_map=table.get_deletion_vectors()
)
# Configure parallel processing
generator.with_shard(idx_of_this_subtask=0, number_of_para_subtasks=4)
# Generate splits
manifest_entries = table.scan().files()
splits = generator.create_splits(manifest_entries)
print(f"Generated {len(splits)} splits")
for split in splits:
print(f" Split: {len(split.files)} files, {split.row_count} rows")