Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon DataFileBatchReader

From Leeroopedia


Knowledge Sources
Domains Data Reading, File Processing
Last Updated 2026-02-08 00:00 GMT

Overview

DataFileBatchReader reads record batches from data files and applies partition information, field mapping, and row tracking transformations.

Description

DataFileBatchReader is a RecordBatchReader implementation that wraps a format-specific reader and applies various transformations to the data. It handles partition column injection, field index mapping, schema enforcement, and row tracking metadata assignment. This reader serves as an adapter between format-specific readers (like Parquet, ORC, Avro) and the table-level reading logic.

The reader supports partition columns by injecting constant values for partitioned fields. It handles schema evolution through index mapping, ensuring that fields are correctly ordered even when the file schema differs from the table schema. For primary key tables, it can prefix key fields with "_KEY_" markers.

When row tracking is enabled, the reader assigns system metadata fields like _ROW_ID and _SEQUENCE_NUMBER to each batch. This enables change data capture and row-level tracking capabilities.

Usage

Use DataFileBatchReader when reading data files that require partition information injection, field mapping, or row tracking metadata. It is the primary reader used by table scans to transform file-level data into table-level records with proper schema and metadata.

Code Reference

Source Location

Signature

class DataFileBatchReader(RecordBatchReader):
    """
    Reads record batch from files of different formats
    """

    def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int],
                 partition_info: PartitionInfo,
                 system_primary_key: Optional[List[str]], fields: List[DataField],
                 max_sequence_number: int,
                 first_row_id: int,
                 row_tracking_enabled: bool,
                 system_fields: dict):
        ...

    def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]:
        """Read arrow batch with transformations applied."""
        ...

    def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
        """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
        ...

    def close(self) -> None:
        """Close the format reader."""
        ...

Import

from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader

I/O Contract

Inputs

Name Type Required Description
format_reader RecordBatchReader Yes Underlying format-specific reader
index_mapping List[int] No Mapping from table fields to file fields
partition_info PartitionInfo No Partition column information
system_primary_key Optional[List[str]] No Primary key field names for key prefixing
fields List[DataField] Yes Table schema fields
max_sequence_number int Yes Maximum sequence number for this file
first_row_id int Yes First row ID for row tracking
row_tracking_enabled bool Yes Whether to assign row tracking metadata
system_fields dict Yes Map of system field names to indices

Outputs

Name Type Description
RecordBatch Optional[RecordBatch] Transformed PyArrow RecordBatch with partition, mapping, and tracking applied

Usage Examples

from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader

# Create format reader for a Parquet file
format_reader = FormatPyArrowReader(file_io, "parquet", file_path, read_fields, predicate)

# Create data file reader with transformations
reader = DataFileBatchReader(
    format_reader=format_reader,
    index_mapping=[0, 1, 2],
    partition_info=partition_info,
    system_primary_key=None,
    fields=table_fields,
    max_sequence_number=100,
    first_row_id=0,
    row_tracking_enabled=True,
    system_fields={"_ROW_ID": 3, "_SEQUENCE_NUMBER": 4}
)

# Read batches
while True:
    batch = reader.read_arrow_batch()
    if batch is None:
        break
    # Process batch with partition columns and row tracking
    process_batch(batch)

reader.close()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment