Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon TableUpdate

From Leeroopedia


Knowledge Sources
Domains Table Updates, Data Modification
Last Updated 2026-02-08 00:00 GMT

Overview

TableUpdate provides functionality for updating table data using scan-rewrite or row-id-based strategies with column-level update support.

Description

The TableUpdate class implements table update operations in Apache Paimon with support for both full-row and partial-column updates. It provides two update strategies: scan-rewrite style updates through ShardTableUpdator and direct row-id-based updates through TableUpdateByRowId.

The scan-rewrite approach reads existing data files, applies updates, and writes new files to replace old ones. This is implemented with shard-based parallelism where multiple subtasks process disjoint subsets of data. The ShardTableUpdator manages row ranges for each file and ensures exact row count preservation during rewrites.

The row-id-based update approach (update_by_arrow_with_row_id) directly updates specific rows using their _ROW_ID values, which is more efficient for small updates. The class supports specifying which columns to update, enabling column-level modifications without rewriting unchanged columns.

Usage

Use TableUpdate when implementing UPDATE operations on Paimon tables, building ETL pipelines that modify existing data, or when you need to apply transformations to table rows with support for both full table scans and targeted updates.

Code Reference

Source Location

Signature

class TableUpdate:
    """Update operations for Paimon tables."""

    def __init__(self, table, commit_user):
        """Initialize with table and commit user."""

    def with_update_type(self, update_cols: List[str]):
        """Specify columns to update (None means all columns)."""

    def with_read_projection(self, projection: List[str]):
        """Set read projection for scan-rewrite updates."""

    def new_shard_updator(self, shard_num: int, total_shard_count: int):
        """Create a shard updater for scan+rewrite style updates."""

    def update_by_arrow_with_row_id(self, table: pa.Table) -> List[CommitMessage]:
        """Update using row IDs directly."""


class ShardTableUpdator:
    """Shard-based table updater for scan-rewrite operations."""

    def __init__(
        self,
        table,
        projection: Optional[List[str]],
        write_cols: List[str],
        commit_user,
        shard_num: int,
        total_shard_count: int,
    ):
        """Initialize shard updater."""

    def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader:
        """Get Arrow reader for this shard's data."""

    def update_by_arrow_batch(self, data: pa.RecordBatch):
        """Write updated data batch."""

    def prepare_commit(self) -> List[CommitMessage]:
        """Prepare commit messages for written files."""

Import

from pypaimon.write.table_update import TableUpdate

I/O Contract

Inputs

Name Type Required Description
table FileStoreTable Yes Table to update
commit_user str Yes User identifier for commit
update_cols List[str] No Columns to update (None = all columns)
projection List[str] No Columns to read for scan-rewrite
shard_num int Yes (for shard updater) Index of this shard/subtask
total_shard_count int Yes (for shard updater) Total number of shards
data pa.Table or pa.RecordBatch Yes Data to write

Outputs

Name Type Description
commit_messages List[CommitMessage] Commit messages for modified files
updator ShardTableUpdator Shard-based updater instance

Usage Examples

from pypaimon.write.table_update import TableUpdate
import pyarrow as pa

# Create table update
updater = TableUpdate(table, commit_user="user1")

# Update specific columns only
updater = updater.with_update_type(["status", "updated_at"])

# Scan-rewrite approach with single shard
shard_updator = updater.new_shard_updator(shard_num=0, total_shard_count=1)

# Read existing data
reader = shard_updator.arrow_reader()
for batch in reader:
    # Transform data
    updated_batch = transform_data(batch)

    # Write back
    shard_updator.update_by_arrow_batch(updated_batch)

# Commit updates
commit_messages = shard_updator.prepare_commit()
table.commit(commit_messages, "user1")

# Parallel scan-rewrite with multiple shards
def update_shard(shard_id, total_shards):
    updater = TableUpdate(table, "user1")
    updater = updater.with_update_type(["value"])

    shard_updator = updater.new_shard_updator(shard_id, total_shards)

    reader = shard_updator.arrow_reader()
    for batch in reader:
        # Apply transformation
        updated = batch.set_column(
            batch.schema.get_field_index("value"),
            "value",
            pa.compute.multiply(batch.column("value"), 2)
        )
        shard_updator.update_by_arrow_batch(updated)

    return shard_updator.prepare_commit()

# Run in parallel
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [
        executor.submit(update_shard, i, 4)
        for i in range(4)
    ]
    all_commits = [f.result() for f in futures]

# Row-ID based update
data_with_row_id = pa.table({
    "_ROW_ID": [0, 1, 2],
    "status": ["active", "inactive", "active"],
    "updated_at": ["2024-01-01", "2024-01-01", "2024-01-02"]
})

updater = TableUpdate(table, "user1")
updater = updater.with_update_type(["status", "updated_at"])
commit_messages = updater.update_by_arrow_with_row_id(data_with_row_id)

# Update all columns (full row replacement)
updater = TableUpdate(table, "user1")
# Not calling with_update_type means update all columns
shard_updator = updater.new_shard_updator(0, 1)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment