Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon TableUpdateByRowId

From Leeroopedia


Knowledge Sources
Domains Table Updates, Row Tracking
Last Updated 2026-02-08 00:00 GMT

Overview

TableUpdateByRowId implements partial column updates using row IDs for efficient targeted modifications without full table scans.

Description

The TableUpdateByRowId class provides an efficient update mechanism for Apache Paimon tables by leveraging row IDs to directly target specific rows for modification. This approach is particularly useful for adding or updating specific columns without rewriting entire data files.

The implementation maintains a mapping of first_row_id values to partitions and row counts, enabling direct file identification for each row. It validates that input data contains all rows in the correct order (monotonically increasing from 0) and groups rows by their corresponding first_row_id for efficient file-level updates.

The update process uses binary search to locate the appropriate first_row_id for each input row, groups rows by file, and writes new files with only the specified columns updated. The original first_row_id values are preserved, and write_cols metadata tracks which columns were modified. This enables efficient partial updates for data evolution scenarios.

Usage

Use TableUpdateByRowId when implementing column-level updates, adding new columns to existing tables, or performing targeted row modifications where reading the entire table would be inefficient.

Code Reference

Source Location

Signature

class TableUpdateByRowId:
    """Table update for partial column updates (data evolution).

    This update is designed for adding/updating specific columns in existing tables.
    Input data should contain _ROW_ID column.
    """

    FIRST_ROW_ID_COLUMN = '_FIRST_ROW_ID'

    def __init__(self, table, commit_user: str):
        """Initialize with table and commit user."""

    def update_columns(self, data: pa.Table, column_names: List[str]) -> List:
        """Add or update columns in the table."""

    def _load_existing_files_info(self):
        """Load existing first_row_ids and build partition map for efficient lookup."""

    def _calculate_first_row_id(self, data: pa.Table) -> pa.Table:
        """Calculate _first_row_id for each row based on _ROW_ID."""

    def _floor_binary_search(self, sorted_seq: List[int], value: int) -> int:
        """Binary search to find the floor value in sorted sequence."""

    def _write_by_first_row_id(self, data: pa.Table, column_names: List[str]):
        """Write data grouped by first_row_id."""

    def _find_partition_by_first_row_id(self, first_row_id: int) -> Optional[GenericRow]:
        """Find the partition for a given first_row_id using pre-built partition map."""

    def _write_group(
        self,
        partition: GenericRow,
        first_row_id: int,
        data: pa.Table,
        column_names: List[str]
    ):
        """Write a group of data with the same first_row_id."""

Import

from pypaimon.write.table_update_by_row_id import TableUpdateByRowId

I/O Contract

Inputs

Name Type Required Description
table FileStoreTable Yes Table to update
commit_user str Yes User identifier for commit
data pa.Table Yes Data with _ROW_ID and columns to update
column_names List[str] Yes Names of columns to update

Outputs

Name Type Description
commit_messages List[CommitMessage] Commit messages for updated files

Usage Examples

from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
import pyarrow as pa

# Create table update by row ID
updater = TableUpdateByRowId(table, commit_user="user1")

# Prepare data with _ROW_ID column
# _ROW_ID must be monotonically increasing from 0
data = pa.table({
    "_ROW_ID": list(range(1000)),  # All rows in order
    "new_column": ["value"] * 1000,
    "updated_column": [i * 2 for i in range(1000)]
})

# Update specific columns
commit_messages = updater.update_columns(
    data=data,
    column_names=["new_column", "updated_column"]
)

# Commit the updates
table.commit(commit_messages, commit_user="user1")

# Add a new column to existing table
# First, read existing row IDs
read_builder = table.new_read_builder()
scan = read_builder.new_scan()
splits = scan.plan().splits()

reader = read_builder.new_read().to_arrow_batch_reader(splits)
existing_data = reader.read_all()

# Create new column data with matching row IDs
new_data = pa.table({
    "_ROW_ID": existing_data["_ROW_ID"],
    "status": ["active"] * existing_data.num_rows,
    "category": ["default"] * existing_data.num_rows
})

# Update with new columns
updater = TableUpdateByRowId(table, "user1")
commit_messages = updater.update_columns(new_data, ["status", "category"])

# Update subset of columns
partial_update = pa.table({
    "_ROW_ID": list(range(100)),  # First 100 rows
    "priority": ["high"] * 100
})

try:
    updater = TableUpdateByRowId(table, "user1")
    commit_messages = updater.update_columns(partial_update, ["priority"])
except ValueError as e:
    # Will fail: row count must match total table rows
    print(f"Error: {e}")

# Correct approach: include all rows
all_row_ids = list(range(total_rows))
priority_values = ["high"] * 100 + ["normal"] * (total_rows - 100)

full_update = pa.table({
    "_ROW_ID": all_row_ids,
    "priority": priority_values
})

updater = TableUpdateByRowId(table, "user1")
commit_messages = updater.update_columns(full_update, ["priority"])

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment