Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon DataEvolutionSplitGenerator

From Leeroopedia


Knowledge Sources
Domains Query Planning, Schema Evolution
Last Updated 2026-02-08 00:00 GMT

Overview

DataEvolutionSplitGenerator generates read splits for tables with data evolution enabled, handling row-ID-based file grouping, blob file filtering, shard/slice parallelism via row ID ranges, and global index integration with optional relevance scores.

Description

This generator extends `AbstractSplitGenerator` to handle tables where schema evolution has created multiple file versions with potentially partial column sets stored as blob files. The core algorithm sorts files by `(first_row_id, is_blob, -max_sequence_number)` and partitions them by (partition, bucket). For shard mode, `_filter_by_shard()` divides the total row ID space evenly across subtasks by computing merged row ID ranges and assigning contiguous range slices to each subtask. For slice mode (used with limit/offset), `_calculate_slice_row_ranges()` computes row ID ranges from position offsets by merging overlapping file ranges and dividing by offset positions. The `_split_by_row_id()` method groups files sharing the same `first_row_id` (a data file with its associated blob files), while `_filter_blob()` excludes blob files that fall outside the data file's row range. Splits are packed for optimal size via `_pack_for_ordered()`, and the result is wrapped in `IndexedSplit` objects when row ranges are present (from global index filtering or slice mode). These IndexedSplit objects carry the computed row ID ranges and optional relevance scores (from vector search) via the `score_getter` callback. The generator handles deletion files by mapping them to their corresponding data files. File-level split maps are computed via `_compute_slice_split_file_idx_map()` to determine which rows to read from each file, with special logic for blob files whose ranges are computed relative to their associated data file.

This sophisticated split generation logic is essential for Paimon's data evolution feature, coordinating schema evolution (partial-column blob files), global index filtering, vector search scoring, and distributed parallel reads through row ID range arithmetic.

Usage

DataEvolutionSplitGenerator is instantiated by the scan planner when reading tables with data evolution enabled (rowTrackingEnabled=true).

Code Reference

Source Location

Signature

class DataEvolutionSplitGenerator(AbstractSplitGenerator):
    def __init__(self, table, target_split_size: int, open_file_cost: int,
                 deletion_files_map=None, row_ranges: Optional[List] = None,
                 score_getter=None): ...

    def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]: ...

    def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: ...
    def _filter_blob(self, files: List[DataFileMeta]) -> List[DataFileMeta]: ...
    def _calculate_slice_row_ranges(self, partitioned_files: defaultdict) -> List[Range]: ...
    def _filter_files_by_row_ranges(self, partitioned_files: defaultdict,
                                     row_ranges: List[Range]) -> defaultdict: ...
    def _filter_by_shard(self, partitioned_files: defaultdict,
                         sub_task_id: int, total_tasks: int) -> defaultdict: ...
    def _wrap_to_indexed_splits(self, splits: List[Split],
                                 row_ranges: List[Range]) -> List[Split]: ...

Import

from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator

I/O Contract

Inputs

Name Type Required Description
table FileStoreTable yes Table being scanned
target_split_size int yes Target size for each split in bytes
file_entries List[ManifestEntry] yes Files to split
row_ranges List[Range] no Row ID ranges from global index filtering
score_getter Callable no Function to get relevance scores by row ID

Outputs

Name Type Description
splits List[Split] DataSplit or IndexedSplit objects ready for reading

Usage Examples

Basic Split Generation

from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator

# Create generator
generator = DataEvolutionSplitGenerator(
    table=table,
    target_split_size=128 * 1024 * 1024,  # 128 MB
    open_file_cost=4 * 1024 * 1024  # 4 MB
)

# Generate splits from manifest entries
splits = generator.create_splits(manifest_entries)

print(f"Generated {len(splits)} splits")

With Global Index Filtering

from pypaimon.globalindex.range import Range

# Create generator with row ranges from global index
row_ranges = [
    Range(100, 500),
    Range(1000, 1500),
    Range(2000, 2500)
]

generator = DataEvolutionSplitGenerator(
    table=table,
    target_split_size=128 * 1024 * 1024,
    open_file_cost=4 * 1024 * 1024,
    row_ranges=row_ranges
)

# Splits will be wrapped in IndexedSplit with row ranges
splits = generator.create_splits(manifest_entries)

for split in splits:
    if isinstance(split, IndexedSplit):
        print(f"Split covers row ranges: {split.row_ranges()}")

With Vector Search Scores

# Score getter function (from FAISS index)
def get_score(row_id: int) -> float:
    # Lookup score from FAISS index
    return faiss_scores.get(row_id, 0.0)

generator = DataEvolutionSplitGenerator(
    table=table,
    target_split_size=128 * 1024 * 1024,
    open_file_cost=4 * 1024 * 1024,
    row_ranges=row_ranges,
    score_getter=get_score
)

# Splits will include relevance scores
splits = generator.create_splits(manifest_entries)

for split in splits:
    if isinstance(split, IndexedSplit) and split.scores:
        print(f"Split has {len(split.scores)} scored rows")

Shard Mode

# Distribute splits across 4 parallel tasks
generator = DataEvolutionSplitGenerator(
    table=table,
    target_split_size=128 * 1024 * 1024,
    open_file_cost=4 * 1024 * 1024
)

# Set shard parameters
generator.idx_of_this_subtask = 2  # This is task 2
generator.number_of_para_subtasks = 4  # Out of 4 total tasks

splits = generator.create_splits(manifest_entries)
# Returns only splits for task 2 based on row ID range sharding

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment