Implementation:Apache Paimon DataEvolutionSplitGenerator
| Knowledge Sources | |
|---|---|
| Domains | Query Planning, Schema Evolution |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
DataEvolutionSplitGenerator generates read splits for tables with data evolution enabled, handling row-ID-based file grouping, blob file filtering, shard/slice parallelism via row ID ranges, and global index integration with optional relevance scores.
Description
This generator extends `AbstractSplitGenerator` to handle tables where schema evolution has created multiple file versions with potentially partial column sets stored as blob files. The core algorithm sorts files by `(first_row_id, is_blob, -max_sequence_number)` and partitions them by (partition, bucket). For shard mode, `_filter_by_shard()` divides the total row ID space evenly across subtasks by computing merged row ID ranges and assigning contiguous range slices to each subtask. For slice mode (used with limit/offset), `_calculate_slice_row_ranges()` computes row ID ranges from position offsets by merging overlapping file ranges and dividing by offset positions. The `_split_by_row_id()` method groups files sharing the same `first_row_id` (a data file with its associated blob files), while `_filter_blob()` excludes blob files that fall outside the data file's row range. Splits are packed for optimal size via `_pack_for_ordered()`, and the result is wrapped in `IndexedSplit` objects when row ranges are present (from global index filtering or slice mode). These IndexedSplit objects carry the computed row ID ranges and optional relevance scores (from vector search) via the `score_getter` callback. The generator handles deletion files by mapping them to their corresponding data files. File-level split maps are computed via `_compute_slice_split_file_idx_map()` to determine which rows to read from each file, with special logic for blob files whose ranges are computed relative to their associated data file.
This sophisticated split generation logic is essential for Paimon's data evolution feature, coordinating schema evolution (partial-column blob files), global index filtering, vector search scoring, and distributed parallel reads through row ID range arithmetic.
Usage
DataEvolutionSplitGenerator is instantiated by the scan planner when reading tables with data evolution enabled (rowTrackingEnabled=true).
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
Signature
class DataEvolutionSplitGenerator(AbstractSplitGenerator):
def __init__(self, table, target_split_size: int, open_file_cost: int,
deletion_files_map=None, row_ranges: Optional[List] = None,
score_getter=None): ...
def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]: ...
def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: ...
def _filter_blob(self, files: List[DataFileMeta]) -> List[DataFileMeta]: ...
def _calculate_slice_row_ranges(self, partitioned_files: defaultdict) -> List[Range]: ...
def _filter_files_by_row_ranges(self, partitioned_files: defaultdict,
row_ranges: List[Range]) -> defaultdict: ...
def _filter_by_shard(self, partitioned_files: defaultdict,
sub_task_id: int, total_tasks: int) -> defaultdict: ...
def _wrap_to_indexed_splits(self, splits: List[Split],
row_ranges: List[Range]) -> List[Split]: ...
Import
from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table | FileStoreTable | yes | Table being scanned |
| target_split_size | int | yes | Target size for each split in bytes |
| file_entries | List[ManifestEntry] | yes | Files to split |
| row_ranges | List[Range] | no | Row ID ranges from global index filtering |
| score_getter | Callable | no | Function to get relevance scores by row ID |
Outputs
| Name | Type | Description |
|---|---|---|
| splits | List[Split] | DataSplit or IndexedSplit objects ready for reading |
Usage Examples
Basic Split Generation
from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator
# Create generator
generator = DataEvolutionSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024, # 128 MB
open_file_cost=4 * 1024 * 1024 # 4 MB
)
# Generate splits from manifest entries
splits = generator.create_splits(manifest_entries)
print(f"Generated {len(splits)} splits")
With Global Index Filtering
from pypaimon.globalindex.range import Range
# Create generator with row ranges from global index
row_ranges = [
Range(100, 500),
Range(1000, 1500),
Range(2000, 2500)
]
generator = DataEvolutionSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024,
open_file_cost=4 * 1024 * 1024,
row_ranges=row_ranges
)
# Splits will be wrapped in IndexedSplit with row ranges
splits = generator.create_splits(manifest_entries)
for split in splits:
if isinstance(split, IndexedSplit):
print(f"Split covers row ranges: {split.row_ranges()}")
With Vector Search Scores
# Score getter function (from FAISS index)
def get_score(row_id: int) -> float:
# Lookup score from FAISS index
return faiss_scores.get(row_id, 0.0)
generator = DataEvolutionSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024,
open_file_cost=4 * 1024 * 1024,
row_ranges=row_ranges,
score_getter=get_score
)
# Splits will include relevance scores
splits = generator.create_splits(manifest_entries)
for split in splits:
if isinstance(split, IndexedSplit) and split.scores:
print(f"Split has {len(split.scores)} scored rows")
Shard Mode
# Distribute splits across 4 parallel tasks
generator = DataEvolutionSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024,
open_file_cost=4 * 1024 * 1024
)
# Set shard parameters
generator.idx_of_this_subtask = 2 # This is task 2
generator.number_of_para_subtasks = 4 # Out of 4 total tasks
splits = generator.create_splits(manifest_entries)
# Returns only splits for task 2 based on row ID range sharding