Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon ConcatBatchReader

From Leeroopedia


Knowledge Sources
Domains Data Reading, Batch Processing
Last Updated 2026-02-08 00:00 GMT

Overview

ConcatBatchReader sequentially concatenates batches from multiple RecordBatchReader suppliers into a single stream.

Description

ConcatBatchReader implements a RecordBatchReader that accepts a list of reader suppliers (callables) and reads from them sequentially. It maintains a queue of reader suppliers and processes one reader at a time. When the current reader is exhausted, it moves to the next reader in the queue until all readers have been consumed.

This reader is useful for reading data from multiple sources in sequence, such as processing multiple data files or combining results from different partitions. The reader automatically manages the lifecycle of individual readers, closing each one when it is exhausted before moving to the next.

The implementation uses a deque for efficient queue operations and ensures proper resource management by closing readers when they are no longer needed or when the entire reader is closed.

Usage

Use ConcatBatchReader when you need to read from multiple data sources sequentially, treating them as a single continuous stream of record batches. It is ideal for scenarios where data is split across multiple files or partitions and needs to be processed in order.

Code Reference

Source Location

Signature

class ConcatBatchReader(RecordBatchReader):

    def __init__(self, reader_suppliers: List[Callable]):
        self.queue: collections.deque[Callable] = collections.deque(reader_suppliers)
        self.current_reader: Optional[RecordBatchReader] = None

    def read_arrow_batch(self) -> Optional[RecordBatch]:
        """Read the next arrow batch from current or next reader."""
        ...

    def close(self) -> None:
        """Close the current reader and clear the queue."""
        ...

Import

from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader

I/O Contract

Inputs

Name Type Required Description
reader_suppliers List[Callable] Yes List of callable functions that return RecordBatchReader instances

Outputs

Name Type Description
RecordBatch Optional[RecordBatch] PyArrow RecordBatch from the current reader, or None when all readers are exhausted

Usage Examples

from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader

# Create reader suppliers for multiple data files
reader_suppliers = [
    lambda: create_reader_for_file1(),
    lambda: create_reader_for_file2(),
    lambda: create_reader_for_file3()
]

# Create concat reader
concat_reader = ConcatBatchReader(reader_suppliers)

# Read all batches sequentially
while True:
    batch = concat_reader.read_arrow_batch()
    if batch is None:
        break
    # Process batch
    process_batch(batch)

# Clean up
concat_reader.close()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment