Implementation:Apache Paimon DataWriter
| Knowledge Sources | |
|---|---|
| Domains | Data Writing, File Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
DataWriter is the abstract base class for all data writers in the Paimon Python SDK, providing the core write-roll-commit lifecycle for producing data files with automatic file rolling, statistics collection, and metadata generation.
Description
DataWriter establishes the foundational write pattern used throughout the write subsystem. It accepts `RecordBatch` data via the `write()` method, delegates processing to subclass-implemented `_process_data()` (for adding system fields, sorting, etc.) and `_merge_data()` (for merging with existing buffered data), and accumulates results in `pending_data`. When accumulated data exceeds `target_file_size`, `_check_and_roll_if_needed()` uses binary search via `_find_optimal_split_point()` to determine the optimal split point (finding the largest prefix that fits within target size), then writes the chunk and recursively checks the remainder. The `_write_data_to_file()` method supports multiple file formats (Parquet, ORC, Avro, Blob, Lance) selected based on configuration and bucket mode (POSTPONE_BUCKET uses Avro), generates unique filenames with UUID, handles external path generation via `ExternalPathProvider` for object tables, writes files via `FileIO`, and creates comprehensive `DataFileMeta` entries capturing min/max keys (from first/last rows of primary keys), column statistics (min/max/nulls), sequence numbers (from `SequenceGenerator`), schema ID, creation timestamp, and optional external path. The `prepare_commit()` method flushes pending data and returns all committed file metadata. The `abort()` method provides cleanup by deleting all written files (using external_path if available). The class handles both regular and external storage paths, automatically selecting the underlying filesystem and preserving URL schemes in metadata.
This architecture centralizes file format selection, rolling logic, statistics collection, and metadata generation, serving as the foundation for concrete writers (AppendOnlyDataWriter, KeyValueDataWriter, BlobWriter, DataBlobWriter) that extend it with specific processing and merging strategies.
Usage
DataWriter is an abstract class extended by concrete writer implementations. Applications use writers through the table API, not directly.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/write/writer/data_writer.py
Signature
class DataWriter(ABC):
def __init__(self, table, partition: Tuple, bucket: int,
max_seq_number: int, options: CoreOptions = None,
write_cols: Optional[List[str]] = None): ...
def write(self, data: pa.RecordBatch): ...
def prepare_commit(self) -> List[DataFileMeta]: ...
def close(self): ...
def abort(self): ...
@abstractmethod
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: ...
@abstractmethod
def _merge_data(self, existing_data: pa.RecordBatch,
new_data: pa.RecordBatch) -> pa.RecordBatch: ...
def _check_and_roll_if_needed(self): ...
def _write_data_to_file(self, data: pa.Table): ...
def _generate_file_path(self, file_name: str) -> str: ...
def _collect_value_stats(self, data: pa.Table, fields: List,
column_stats: Optional[Dict] = None) -> SimpleStats: ...
@staticmethod
def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: ...
@staticmethod
def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict: ...
class SequenceGenerator:
def __init__(self, start: int = 0): ...
def next(self) -> int: ...
Import
from pypaimon.write.writer.data_writer import DataWriter, SequenceGenerator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table | FileStoreTable | yes | Table being written to |
| partition | Tuple | yes | Partition values |
| bucket | int | yes | Bucket number |
| max_seq_number | int | yes | Starting sequence number |
| data | pa.RecordBatch | yes | Data to write |
Outputs
| Name | Type | Description |
|---|---|---|
| DataFileMeta | List[DataFileMeta] | Metadata for all committed files |
Usage Examples
Extend for Append-Only Writer
from pypaimon.write.writer.data_writer import DataWriter
import pyarrow as pa
import pyarrow.compute as pc
class AppendOnlyDataWriter(DataWriter):
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
# Add sequence numbers
seq_array = pa.array([
self.sequence_generator.next() for _ in range(data.num_rows)
])
return data.append_column("_SEQUENCE_NUMBER", seq_array)
def _merge_data(self, existing_data: pa.RecordBatch,
new_data: pa.RecordBatch) -> pa.RecordBatch:
# Simple concatenation for append-only
return pa.Table.from_batches([existing_data, new_data]).to_batches()[0]
Write Data
# Initialize writer
writer = AppendOnlyDataWriter(
table=table,
partition=("2024-01-15",),
bucket=0,
max_seq_number=0
)
# Write batches
batch1 = pa.record_batch([
pa.array([1, 2, 3]),
pa.array(["Alice", "Bob", "Charlie"])
], names=["id", "name"])
writer.write(batch1)
batch2 = pa.record_batch([
pa.array([4, 5]),
pa.array(["Dave", "Eve"])
], names=["id", "name"])
writer.write(batch2)
# Prepare commit (triggers file write if pending data exists)
files = writer.prepare_commit()
print(f"Wrote {len(files)} files")
for file_meta in files:
print(f"File: {file_meta.file_name}")
print(f" Rows: {file_meta.row_count}")
print(f" Size: {file_meta.file_size} bytes")
print(f" Min key: {file_meta.min_key.values}")
print(f" Max key: {file_meta.max_key.values}")
Handle Errors
try:
writer.write(batch)
files = writer.prepare_commit()
except Exception as e:
# Cleanup on error
writer.abort()
print(f"Write failed, cleaned up files: {e}")
raise
Custom File Formats
from pypaimon.common.options import Options
# Configure for ORC format
options = Options({
"file.format": "orc",
"file.compression": "zstd",
"file.compression.zstd-level": "3"
})
writer = AppendOnlyDataWriter(
table=table,
partition=(),
bucket=0,
max_seq_number=0,
options=options
)
# Files will be written in ORC format with zstd compression
External Storage
# Table with external path configuration
external_table = ... # Table with external path provider configured
writer = AppendOnlyDataWriter(
table=external_table,
partition=("2024-01-15",),
bucket=0,
max_seq_number=0
)
writer.write(batch)
files = writer.prepare_commit()
# Files written to external storage location
for file_meta in files:
print(f"External path: {file_meta.external_path}")
print(f"Internal path: {file_meta.file_path}")
Statistics Collection
# Write data
writer.write(batch)
files = writer.prepare_commit()
# Examine statistics
file_meta = files[0]
print("Key statistics:")
print(f" Min values: {file_meta.key_stats.min_values.values}")
print(f" Max values: {file_meta.key_stats.max_values.values}")
print(f" Null counts: {file_meta.key_stats.null_counts}")
if file_meta.value_stats:
print("Value statistics:")
print(f" Min values: {file_meta.value_stats.min_values.values}")
print(f" Max values: {file_meta.value_stats.max_values.values}")