Implementation:Apache Paimon DataFileBatchReader
| Knowledge Sources | |
|---|---|
| Domains | Data Reading, File Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
DataFileBatchReader reads record batches from data files and applies partition information, field mapping, and row tracking transformations.
Description
DataFileBatchReader is a RecordBatchReader implementation that wraps a format-specific reader and applies various transformations to the data. It handles partition column injection, field index mapping, schema enforcement, and row tracking metadata assignment. This reader serves as an adapter between format-specific readers (like Parquet, ORC, Avro) and the table-level reading logic.
The reader supports partition columns by injecting constant values for partitioned fields. It handles schema evolution through index mapping, ensuring that fields are correctly ordered even when the file schema differs from the table schema. For primary key tables, it can prefix key fields with "_KEY_" markers.
When row tracking is enabled, the reader assigns system metadata fields like _ROW_ID and _SEQUENCE_NUMBER to each batch. This enables change data capture and row-level tracking capabilities.
Usage
Use DataFileBatchReader when reading data files that require partition information injection, field mapping, or row tracking metadata. It is the primary reader used by table scans to transform file-level data into table-level records with proper schema and metadata.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/reader/data_file_batch_reader.py
Signature
class DataFileBatchReader(RecordBatchReader):
"""
Reads record batch from files of different formats
"""
def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int],
partition_info: PartitionInfo,
system_primary_key: Optional[List[str]], fields: List[DataField],
max_sequence_number: int,
first_row_id: int,
row_tracking_enabled: bool,
system_fields: dict):
...
def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]:
"""Read arrow batch with transformations applied."""
...
def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
"""Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
...
def close(self) -> None:
"""Close the format reader."""
...
Import
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| format_reader | RecordBatchReader | Yes | Underlying format-specific reader |
| index_mapping | List[int] | No | Mapping from table fields to file fields |
| partition_info | PartitionInfo | No | Partition column information |
| system_primary_key | Optional[List[str]] | No | Primary key field names for key prefixing |
| fields | List[DataField] | Yes | Table schema fields |
| max_sequence_number | int | Yes | Maximum sequence number for this file |
| first_row_id | int | Yes | First row ID for row tracking |
| row_tracking_enabled | bool | Yes | Whether to assign row tracking metadata |
| system_fields | dict | Yes | Map of system field names to indices |
Outputs
| Name | Type | Description |
|---|---|---|
| RecordBatch | Optional[RecordBatch] | Transformed PyArrow RecordBatch with partition, mapping, and tracking applied |
Usage Examples
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
# Create format reader for a Parquet file
format_reader = FormatPyArrowReader(file_io, "parquet", file_path, read_fields, predicate)
# Create data file reader with transformations
reader = DataFileBatchReader(
format_reader=format_reader,
index_mapping=[0, 1, 2],
partition_info=partition_info,
system_primary_key=None,
fields=table_fields,
max_sequence_number=100,
first_row_id=0,
row_tracking_enabled=True,
system_fields={"_ROW_ID": 3, "_SEQUENCE_NUMBER": 4}
)
# Read batches
while True:
batch = reader.read_arrow_batch()
if batch is None:
break
# Process batch with partition columns and row tracking
process_batch(batch)
reader.close()