Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon KeyValueDataWriter

From Leeroopedia


Knowledge Sources
Domains Primary Key Tables, Data Writing
Last Updated 2026-02-08 00:00 GMT

Overview

KeyValueDataWriter is a data writer for primary key tables that adds system fields and sorts data by primary keys before writing.

Description

The KeyValueDataWriter class extends the DataWriter base class to provide specialized writing logic for Apache Paimon primary key tables. It implements the key-value storage model required by LSM-tree architecture where data is organized and merged based on primary keys.

The writer enhances input data by adding system fields: _KEY_* columns for each primary key field, _SEQUENCE_NUMBER for operation ordering, and _VALUE_KIND for change type tracking. It then sorts the data by primary keys (and sequence number) to maintain the sorted order required for efficient merging and compaction.

The _process_data method handles initial data transformation with system field injection and sorting, while _merge_data combines existing and new data while maintaining sort order. The implementation uses PyArrow Compute functions for efficient sorting and generates monotonically increasing sequence numbers for each write operation.

Usage

Use KeyValueDataWriter when implementing writers for primary key tables, particularly when you need to ensure data is properly formatted with system fields and sorted for LSM-tree storage and merge operations.

Code Reference

Source Location

Signature

class KeyValueDataWriter(DataWriter):
    """Data writer for primary key tables with system fields and sorting."""

    def _process_data(self, data: pa.RecordBatch) -> pa.Table:
        """Process data by adding system fields and sorting."""

    def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
        """Merge existing and new data while maintaining sort order."""

    def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
        """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""

    def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
        """Sort data by primary key fields and sequence number."""

Import

from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter

I/O Contract

Inputs

Name Type Required Description
data pa.RecordBatch Yes Input data to process
existing_data pa.Table Yes (for merge) Existing data in buffer
new_data pa.Table Yes (for merge) New data to merge

Outputs

Name Type Description
processed_data pa.Table Data with system fields added and sorted
merged_data pa.Table Merged and sorted data

Usage Examples

from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
import pyarrow as pa

# Create writer for primary key table
# Table schema: partition_keys=["date"], primary_keys=["date", "id"]
writer = KeyValueDataWriter(
    table=table,
    partition=("2024-01-01",),
    bucket=0,
    sequence_generator=sequence_gen,
    options=table.options
)

# Write data
data = pa.record_batch({
    "date": ["2024-01-01", "2024-01-01", "2024-01-01"],
    "id": [3, 1, 2],
    "name": ["Charlie", "Alice", "Bob"],
    "value": [300, 100, 200]
})

writer.write(data)

# Data is automatically:
# 1. Enhanced with system fields:
#    - _KEY_date: ["2024-01-01", "2024-01-01", "2024-01-01"]
#    - _KEY_id: [3, 1, 2]
#    - _SEQUENCE_NUMBER: [1000, 1001, 1002]
#    - _VALUE_KIND: [0, 0, 0] (INSERT)
#
# 2. Sorted by primary key (date, id):
#    Final order: id=1, id=2, id=3

# Prepare commit
files = writer.prepare_commit()
for file in files:
    print(f"Written file: {file.file_name}")
    print(f"Sorted by: date, id")

# Write more data - will be merged with existing
more_data = pa.record_batch({
    "date": ["2024-01-01", "2024-01-01"],
    "id": [2, 4],  # id=2 updates existing, id=4 is new
    "name": ["Bob_Updated", "David"],
    "value": [250, 400]
})

writer.write(more_data)
# Merges with existing buffer and maintains sort order

# System field structure for primary key table:
# Original columns: date, id, name, value
# With system fields: _KEY_date, _KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND,
#                     date, id, name, value

# The system fields enable:
# - Efficient key-based lookups using _KEY_* columns
# - Ordering during merge using _SEQUENCE_NUMBER
# - Change tracking using _VALUE_KIND

# Trimmed primary keys example
# Table: partition_keys=["date"], primary_keys=["date", "region", "id"]
# Trimmed keys (non-partition): ["region", "id"]
# System fields added: _KEY_region, _KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment