Implementation:Apache Paimon AppendTableSplitGenerator
| Knowledge Sources | |
|---|---|
| Domains | Split Generation, Append Tables |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
AppendTableSplitGenerator creates data splits for append-only tables with support for sharding and slicing.
Description
AppendTableSplitGenerator extends AbstractSplitGenerator to create splits specifically for append-only tables. Unlike primary key tables that require merge operations, append-only tables can be read directly in file order. The generator partitions files by their partition keys and buckets, then packs them into splits of approximately target size while preserving file order.
The generator supports two modes of parallel processing: sharding (bucket-based distribution) and slicing (row-range-based distribution). In sharding mode, files are distributed across workers based on their bucket numbers. In slicing mode, the generator calculates row ranges for each worker and creates SlicedSplit objects that contain file-level index maps specifying which rows to read from each file.
The implementation uses efficient bin-packing to group files into splits that approach the target split size, considering both file size and open file cost. It handles edge cases like files spanning multiple worker ranges and ensures proper row counting when files are partially assigned to different workers.
Usage
Use AppendTableSplitGenerator when scanning append-only tables. It automatically handles parallel processing through either sharding (for distributed systems) or slicing (for fine-grained parallelism) based on the configuration. The generator is instantiated by the table scan and called with manifest entries to produce splits for reading.
Code Reference
Source Location
Signature
class AppendTableSplitGenerator(AbstractSplitGenerator):
"""
Split generator for append-only tables.
"""
def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
"""Create splits from manifest entries with optional sharding or slicing."""
...
def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int,
plan_end_pos: int) -> List[Split]:
"""Wrap regular splits into SlicedSplit objects with file index maps."""
...
def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple:
"""Filter file entries by shard for parallel processing."""
...
Import
from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| file_entries | List[ManifestEntry] | Yes | List of manifest entries representing files to split |
| table | Table | Yes | Table instance (from constructor) |
| target_split_size | int | Yes | Target size for each split in bytes |
| open_file_cost | int | Yes | Cost penalty for opening a file |
Outputs
| Name | Type | Description |
|---|---|---|
| splits | List[Split] | List of Split or SlicedSplit objects ready for reading |
Usage Examples
from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator
# Create split generator for append table
generator = AppendTableSplitGenerator(
table=table,
target_split_size=128 * 1024 * 1024, # 128 MB
open_file_cost=4 * 1024 * 1024 # 4 MB penalty per file
)
# Option 1: Shard-based parallel processing (bucket distribution)
generator.with_shard(idx_of_this_subtask=0, number_of_para_subtasks=4)
# Option 2: Slice-based parallel processing (row range distribution)
# generator.with_slice(start_pos=0, end_pos=1000000)
# Get manifest entries from scan
manifest_entries = table.scan().files()
# Create splits
splits = generator.create_splits(manifest_entries)
# Read splits
for split in splits:
reader = table.read_split(split)
# Process data
for batch in reader:
process(batch)