Implementation:Apache Paimon FormatAvroReader
| Knowledge Sources | |
|---|---|
| Domains | Data Reading, File Formats |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
FormatAvroReader reads Avro files and converts them to PyArrow RecordBatch format with optional predicate filtering.
Description
FormatAvroReader is a RecordBatchReader implementation specialized for reading Avro format files. It uses the fastavro library to efficiently read Avro records and converts them to PyArrow RecordBatch objects. The reader supports field projection by reading only specified columns and can apply predicate pushdown for filtering rows during the read process.
The reader processes data in configurable batch sizes (default 1024 rows) and supports predicate pushdown using PyArrow's dataset filtering capabilities. When a predicate is provided, records are loaded into a PyArrow Table, filtered using the dataset scanner, and then converted back to RecordBatch format.
This implementation bridges the gap between Avro's native format and PyArrow's columnar format, enabling efficient data processing in the Paimon ecosystem while leveraging the benefits of both storage formats.
Usage
Use FormatAvroReader when reading data from Avro format files in Paimon tables. It is automatically selected by the table scan when the file format is Avro, providing transparent support for Avro storage with predicate pushdown and field projection capabilities.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/reader/format_avro_reader.py
Signature
class FormatAvroReader(RecordBatchReader):
"""
An ArrowBatchReader for reading Avro files using fastavro, filters records based on the
provided predicate and projection, and converts Avro records to RecordBatch format.
"""
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField],
push_down_predicate: Any, batch_size: int = 1024):
...
def read_arrow_batch(self) -> Optional[RecordBatch]:
"""Read next batch of records from Avro file."""
...
def close(self):
"""Close the file handle."""
...
Import
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| file_io | FileIO | Yes | File I/O abstraction for accessing the file |
| file_path | str | Yes | Path to the Avro file |
| read_fields | List[str] | Yes | List of field names to read (projection) |
| full_fields | List[DataField] | Yes | Complete table schema fields |
| push_down_predicate | Any | No | PyArrow predicate for row filtering |
| batch_size | int | No | Number of rows per batch (default 1024) |
Outputs
| Name | Type | Description |
|---|---|---|
| RecordBatch | Optional[RecordBatch] | PyArrow RecordBatch with filtered and projected data, or None at EOF |
Usage Examples
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
from pypaimon.common.file_io import FileIO
import pyarrow.compute as pc
# Create file I/O
file_io = FileIO.get("file:///data/table")
# Define schema and projection
full_fields = [
DataField("id", IntType()),
DataField("name", StringType()),
DataField("age", IntType())
]
read_fields = ["id", "name"]
# Create predicate: age > 18
predicate = pc.field("age") > 18
# Create Avro reader
reader = FormatAvroReader(
file_io=file_io,
file_path="/data/table/file.avro",
read_fields=read_fields,
full_fields=full_fields,
push_down_predicate=predicate,
batch_size=1024
)
# Read batches
while True:
batch = reader.read_arrow_batch()
if batch is None:
break
# Process batch
print(f"Read {batch.num_rows} rows")
reader.close()