Implementation:Apache Paimon TableUpdate
| Knowledge Sources | |
|---|---|
| Domains | Table Updates, Data Modification |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
TableUpdate provides functionality for updating table data using scan-rewrite or row-id-based strategies with column-level update support.
Description
The TableUpdate class implements table update operations in Apache Paimon with support for both full-row and partial-column updates. It provides two update strategies: scan-rewrite style updates through ShardTableUpdator and direct row-id-based updates through TableUpdateByRowId.
The scan-rewrite approach reads existing data files, applies updates, and writes new files to replace old ones. This is implemented with shard-based parallelism where multiple subtasks process disjoint subsets of data. The ShardTableUpdator manages row ranges for each file and ensures exact row count preservation during rewrites.
The row-id-based update approach (update_by_arrow_with_row_id) directly updates specific rows using their _ROW_ID values, which is more efficient for small updates. The class supports specifying which columns to update, enabling column-level modifications without rewriting unchanged columns.
Usage
Use TableUpdate when implementing UPDATE operations on Paimon tables, building ETL pipelines that modify existing data, or when you need to apply transformations to table rows with support for both full table scans and targeted updates.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/write/table_update.py
Signature
class TableUpdate:
"""Update operations for Paimon tables."""
def __init__(self, table, commit_user):
"""Initialize with table and commit user."""
def with_update_type(self, update_cols: List[str]):
"""Specify columns to update (None means all columns)."""
def with_read_projection(self, projection: List[str]):
"""Set read projection for scan-rewrite updates."""
def new_shard_updator(self, shard_num: int, total_shard_count: int):
"""Create a shard updater for scan+rewrite style updates."""
def update_by_arrow_with_row_id(self, table: pa.Table) -> List[CommitMessage]:
"""Update using row IDs directly."""
class ShardTableUpdator:
"""Shard-based table updater for scan-rewrite operations."""
def __init__(
self,
table,
projection: Optional[List[str]],
write_cols: List[str],
commit_user,
shard_num: int,
total_shard_count: int,
):
"""Initialize shard updater."""
def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader:
"""Get Arrow reader for this shard's data."""
def update_by_arrow_batch(self, data: pa.RecordBatch):
"""Write updated data batch."""
def prepare_commit(self) -> List[CommitMessage]:
"""Prepare commit messages for written files."""
Import
from pypaimon.write.table_update import TableUpdate
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table | FileStoreTable | Yes | Table to update |
| commit_user | str | Yes | User identifier for commit |
| update_cols | List[str] | No | Columns to update (None = all columns) |
| projection | List[str] | No | Columns to read for scan-rewrite |
| shard_num | int | Yes (for shard updater) | Index of this shard/subtask |
| total_shard_count | int | Yes (for shard updater) | Total number of shards |
| data | pa.Table or pa.RecordBatch | Yes | Data to write |
Outputs
| Name | Type | Description |
|---|---|---|
| commit_messages | List[CommitMessage] | Commit messages for modified files |
| updator | ShardTableUpdator | Shard-based updater instance |
Usage Examples
from pypaimon.write.table_update import TableUpdate
import pyarrow as pa
# Create table update
updater = TableUpdate(table, commit_user="user1")
# Update specific columns only
updater = updater.with_update_type(["status", "updated_at"])
# Scan-rewrite approach with single shard
shard_updator = updater.new_shard_updator(shard_num=0, total_shard_count=1)
# Read existing data
reader = shard_updator.arrow_reader()
for batch in reader:
# Transform data
updated_batch = transform_data(batch)
# Write back
shard_updator.update_by_arrow_batch(updated_batch)
# Commit updates
commit_messages = shard_updator.prepare_commit()
table.commit(commit_messages, "user1")
# Parallel scan-rewrite with multiple shards
def update_shard(shard_id, total_shards):
updater = TableUpdate(table, "user1")
updater = updater.with_update_type(["value"])
shard_updator = updater.new_shard_updator(shard_id, total_shards)
reader = shard_updator.arrow_reader()
for batch in reader:
# Apply transformation
updated = batch.set_column(
batch.schema.get_field_index("value"),
"value",
pa.compute.multiply(batch.column("value"), 2)
)
shard_updator.update_by_arrow_batch(updated)
return shard_updator.prepare_commit()
# Run in parallel
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(update_shard, i, 4)
for i in range(4)
]
all_commits = [f.result() for f in futures]
# Row-ID based update
data_with_row_id = pa.table({
"_ROW_ID": [0, 1, 2],
"status": ["active", "inactive", "active"],
"updated_at": ["2024-01-01", "2024-01-01", "2024-01-02"]
})
updater = TableUpdate(table, "user1")
updater = updater.with_update_type(["status", "updated_at"])
commit_messages = updater.update_by_arrow_with_row_id(data_with_row_id)
# Update all columns (full row replacement)
updater = TableUpdate(table, "user1")
# Not calling with_update_type means update all columns
shard_updator = updater.new_shard_updator(0, 1)