Implementation:Apache Paimon ConcatBatchReader
| Knowledge Sources | |
|---|---|
| Domains | Data Reading, Batch Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
ConcatBatchReader sequentially concatenates batches from multiple RecordBatchReader suppliers into a single stream.
Description
ConcatBatchReader implements a RecordBatchReader that accepts a list of reader suppliers (callables) and reads from them sequentially. It maintains a queue of reader suppliers and processes one reader at a time. When the current reader is exhausted, it moves to the next reader in the queue until all readers have been consumed.
This reader is useful for reading data from multiple sources in sequence, such as processing multiple data files or combining results from different partitions. The reader automatically manages the lifecycle of individual readers, closing each one when it is exhausted before moving to the next.
The implementation uses a deque for efficient queue operations and ensures proper resource management by closing readers when they are no longer needed or when the entire reader is closed.
Usage
Use ConcatBatchReader when you need to read from multiple data sources sequentially, treating them as a single continuous stream of record batches. It is ideal for scenarios where data is split across multiple files or partitions and needs to be processed in order.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/read/reader/concat_batch_reader.py
Signature
class ConcatBatchReader(RecordBatchReader):
def __init__(self, reader_suppliers: List[Callable]):
self.queue: collections.deque[Callable] = collections.deque(reader_suppliers)
self.current_reader: Optional[RecordBatchReader] = None
def read_arrow_batch(self) -> Optional[RecordBatch]:
"""Read the next arrow batch from current or next reader."""
...
def close(self) -> None:
"""Close the current reader and clear the queue."""
...
Import
from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| reader_suppliers | List[Callable] | Yes | List of callable functions that return RecordBatchReader instances |
Outputs
| Name | Type | Description |
|---|---|---|
| RecordBatch | Optional[RecordBatch] | PyArrow RecordBatch from the current reader, or None when all readers are exhausted |
Usage Examples
from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader
# Create reader suppliers for multiple data files
reader_suppliers = [
lambda: create_reader_for_file1(),
lambda: create_reader_for_file2(),
lambda: create_reader_for_file3()
]
# Create concat reader
concat_reader = ConcatBatchReader(reader_suppliers)
# Read all batches sequentially
while True:
batch = concat_reader.read_arrow_batch()
if batch is None:
break
# Process batch
process_batch(batch)
# Clean up
concat_reader.close()