Implementation:Apache Paimon KeyValueDataWriter
| Knowledge Sources | |
|---|---|
| Domains | Primary Key Tables, Data Writing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
KeyValueDataWriter is a data writer for primary key tables that adds system fields and sorts data by primary keys before writing.
Description
The KeyValueDataWriter class extends the DataWriter base class to provide specialized writing logic for Apache Paimon primary key tables. It implements the key-value storage model required by LSM-tree architecture where data is organized and merged based on primary keys.
The writer enhances input data by adding system fields: _KEY_* columns for each primary key field, _SEQUENCE_NUMBER for operation ordering, and _VALUE_KIND for change type tracking. It then sorts the data by primary keys (and sequence number) to maintain the sorted order required for efficient merging and compaction.
The _process_data method handles initial data transformation with system field injection and sorting, while _merge_data combines existing and new data while maintaining sort order. The implementation uses PyArrow Compute functions for efficient sorting and generates monotonically increasing sequence numbers for each write operation.
Usage
Use KeyValueDataWriter when implementing writers for primary key tables, particularly when you need to ensure data is properly formatted with system fields and sorted for LSM-tree storage and merge operations.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/write/writer/key_value_data_writer.py
Signature
class KeyValueDataWriter(DataWriter):
"""Data writer for primary key tables with system fields and sorting."""
def _process_data(self, data: pa.RecordBatch) -> pa.Table:
"""Process data by adding system fields and sorting."""
def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
"""Merge existing and new data while maintaining sort order."""
def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
"""Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""
def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
"""Sort data by primary key fields and sequence number."""
Import
from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | pa.RecordBatch | Yes | Input data to process |
| existing_data | pa.Table | Yes (for merge) | Existing data in buffer |
| new_data | pa.Table | Yes (for merge) | New data to merge |
Outputs
| Name | Type | Description |
|---|---|---|
| processed_data | pa.Table | Data with system fields added and sorted |
| merged_data | pa.Table | Merged and sorted data |
Usage Examples
from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
import pyarrow as pa
# Create writer for primary key table
# Table schema: partition_keys=["date"], primary_keys=["date", "id"]
writer = KeyValueDataWriter(
table=table,
partition=("2024-01-01",),
bucket=0,
sequence_generator=sequence_gen,
options=table.options
)
# Write data
data = pa.record_batch({
"date": ["2024-01-01", "2024-01-01", "2024-01-01"],
"id": [3, 1, 2],
"name": ["Charlie", "Alice", "Bob"],
"value": [300, 100, 200]
})
writer.write(data)
# Data is automatically:
# 1. Enhanced with system fields:
# - _KEY_date: ["2024-01-01", "2024-01-01", "2024-01-01"]
# - _KEY_id: [3, 1, 2]
# - _SEQUENCE_NUMBER: [1000, 1001, 1002]
# - _VALUE_KIND: [0, 0, 0] (INSERT)
#
# 2. Sorted by primary key (date, id):
# Final order: id=1, id=2, id=3
# Prepare commit
files = writer.prepare_commit()
for file in files:
print(f"Written file: {file.file_name}")
print(f"Sorted by: date, id")
# Write more data - will be merged with existing
more_data = pa.record_batch({
"date": ["2024-01-01", "2024-01-01"],
"id": [2, 4], # id=2 updates existing, id=4 is new
"name": ["Bob_Updated", "David"],
"value": [250, 400]
})
writer.write(more_data)
# Merges with existing buffer and maintains sort order
# System field structure for primary key table:
# Original columns: date, id, name, value
# With system fields: _KEY_date, _KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND,
# date, id, name, value
# The system fields enable:
# - Efficient key-based lookups using _KEY_* columns
# - Ordering during merge using _SEQUENCE_NUMBER
# - Change tracking using _VALUE_KIND
# Trimmed primary keys example
# Table: partition_keys=["date"], primary_keys=["date", "region", "id"]
# Trimmed keys (non-partition): ["region", "id"]
# System fields added: _KEY_region, _KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND