Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon DataWriter

From Leeroopedia


Knowledge Sources
Domains Data Writing, File Management
Last Updated 2026-02-08 00:00 GMT

Overview

DataWriter is the abstract base class for all data writers in the Paimon Python SDK, providing the core write-roll-commit lifecycle for producing data files with automatic file rolling, statistics collection, and metadata generation.

Description

DataWriter establishes the foundational write pattern used throughout the write subsystem. It accepts `RecordBatch` data via the `write()` method, delegates processing to subclass-implemented `_process_data()` (for adding system fields, sorting, etc.) and `_merge_data()` (for merging with existing buffered data), and accumulates results in `pending_data`. When accumulated data exceeds `target_file_size`, `_check_and_roll_if_needed()` uses binary search via `_find_optimal_split_point()` to determine the optimal split point (finding the largest prefix that fits within target size), then writes the chunk and recursively checks the remainder. The `_write_data_to_file()` method supports multiple file formats (Parquet, ORC, Avro, Blob, Lance) selected based on configuration and bucket mode (POSTPONE_BUCKET uses Avro), generates unique filenames with UUID, handles external path generation via `ExternalPathProvider` for object tables, writes files via `FileIO`, and creates comprehensive `DataFileMeta` entries capturing min/max keys (from first/last rows of primary keys), column statistics (min/max/nulls), sequence numbers (from `SequenceGenerator`), schema ID, creation timestamp, and optional external path. The `prepare_commit()` method flushes pending data and returns all committed file metadata. The `abort()` method provides cleanup by deleting all written files (using external_path if available). The class handles both regular and external storage paths, automatically selecting the underlying filesystem and preserving URL schemes in metadata.

This architecture centralizes file format selection, rolling logic, statistics collection, and metadata generation, serving as the foundation for concrete writers (AppendOnlyDataWriter, KeyValueDataWriter, BlobWriter, DataBlobWriter) that extend it with specific processing and merging strategies.

Usage

DataWriter is an abstract class extended by concrete writer implementations. Applications use writers through the table API, not directly.

Code Reference

Source Location

Signature

class DataWriter(ABC):
    def __init__(self, table, partition: Tuple, bucket: int,
                 max_seq_number: int, options: CoreOptions = None,
                 write_cols: Optional[List[str]] = None): ...

    def write(self, data: pa.RecordBatch): ...
    def prepare_commit(self) -> List[DataFileMeta]: ...
    def close(self): ...
    def abort(self): ...

    @abstractmethod
    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: ...

    @abstractmethod
    def _merge_data(self, existing_data: pa.RecordBatch,
                    new_data: pa.RecordBatch) -> pa.RecordBatch: ...

    def _check_and_roll_if_needed(self): ...
    def _write_data_to_file(self, data: pa.Table): ...
    def _generate_file_path(self, file_name: str) -> str: ...
    def _collect_value_stats(self, data: pa.Table, fields: List,
                             column_stats: Optional[Dict] = None) -> SimpleStats: ...

    @staticmethod
    def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: ...
    @staticmethod
    def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict: ...

class SequenceGenerator:
    def __init__(self, start: int = 0): ...
    def next(self) -> int: ...

Import

from pypaimon.write.writer.data_writer import DataWriter, SequenceGenerator

I/O Contract

Inputs

Name Type Required Description
table FileStoreTable yes Table being written to
partition Tuple yes Partition values
bucket int yes Bucket number
max_seq_number int yes Starting sequence number
data pa.RecordBatch yes Data to write

Outputs

Name Type Description
DataFileMeta List[DataFileMeta] Metadata for all committed files

Usage Examples

Extend for Append-Only Writer

from pypaimon.write.writer.data_writer import DataWriter
import pyarrow as pa
import pyarrow.compute as pc

class AppendOnlyDataWriter(DataWriter):
    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
        # Add sequence numbers
        seq_array = pa.array([
            self.sequence_generator.next() for _ in range(data.num_rows)
        ])
        return data.append_column("_SEQUENCE_NUMBER", seq_array)

    def _merge_data(self, existing_data: pa.RecordBatch,
                    new_data: pa.RecordBatch) -> pa.RecordBatch:
        # Simple concatenation for append-only
        return pa.Table.from_batches([existing_data, new_data]).to_batches()[0]

Write Data

# Initialize writer
writer = AppendOnlyDataWriter(
    table=table,
    partition=("2024-01-15",),
    bucket=0,
    max_seq_number=0
)

# Write batches
batch1 = pa.record_batch([
    pa.array([1, 2, 3]),
    pa.array(["Alice", "Bob", "Charlie"])
], names=["id", "name"])

writer.write(batch1)

batch2 = pa.record_batch([
    pa.array([4, 5]),
    pa.array(["Dave", "Eve"])
], names=["id", "name"])

writer.write(batch2)

# Prepare commit (triggers file write if pending data exists)
files = writer.prepare_commit()
print(f"Wrote {len(files)} files")

for file_meta in files:
    print(f"File: {file_meta.file_name}")
    print(f"  Rows: {file_meta.row_count}")
    print(f"  Size: {file_meta.file_size} bytes")
    print(f"  Min key: {file_meta.min_key.values}")
    print(f"  Max key: {file_meta.max_key.values}")

Handle Errors

try:
    writer.write(batch)
    files = writer.prepare_commit()
except Exception as e:
    # Cleanup on error
    writer.abort()
    print(f"Write failed, cleaned up files: {e}")
    raise

Custom File Formats

from pypaimon.common.options import Options

# Configure for ORC format
options = Options({
    "file.format": "orc",
    "file.compression": "zstd",
    "file.compression.zstd-level": "3"
})

writer = AppendOnlyDataWriter(
    table=table,
    partition=(),
    bucket=0,
    max_seq_number=0,
    options=options
)

# Files will be written in ORC format with zstd compression

External Storage

# Table with external path configuration
external_table = ...  # Table with external path provider configured

writer = AppendOnlyDataWriter(
    table=external_table,
    partition=("2024-01-15",),
    bucket=0,
    max_seq_number=0
)

writer.write(batch)
files = writer.prepare_commit()

# Files written to external storage location
for file_meta in files:
    print(f"External path: {file_meta.external_path}")
    print(f"Internal path: {file_meta.file_path}")

Statistics Collection

# Write data
writer.write(batch)
files = writer.prepare_commit()

# Examine statistics
file_meta = files[0]

print("Key statistics:")
print(f"  Min values: {file_meta.key_stats.min_values.values}")
print(f"  Max values: {file_meta.key_stats.max_values.values}")
print(f"  Null counts: {file_meta.key_stats.null_counts}")

if file_meta.value_stats:
    print("Value statistics:")
    print(f"  Min values: {file_meta.value_stats.min_values.values}")
    print(f"  Max values: {file_meta.value_stats.max_values.values}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment