Principle:Apache Paimon Table Writing
| Knowledge Sources | |
|---|---|
| Domains | Data_Writing, Storage |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Table writing orchestrates the ingestion of data rows into partitioned, bucketed storage through key extraction, routing, system field injection, and coordinated file generation.
Description
The table writing principle addresses the complexity of transforming a stream of data rows into properly organized storage files that respect partitioning schemes, bucketing strategies, and table update semantics. Writers must extract routing information from each row, direct it to the appropriate partition and bucket, inject system-level metadata fields, and coordinate the generation of data files with proper statistics and manifest entries. This multi-stage process ensures data lands in the correct location while maintaining consistency guarantees.
Key extraction forms the foundation of the writing process. For tables with primary keys, writers must identify which fields constitute the key and extract them for routing and deduplication purposes. The key extractor separates rows into key and value components, enabling the system to implement update and delete operations efficiently. Bucketing distributes data within partitions based on hash values of bucketing columns, ensuring that rows with the same bucket key always land in the same bucket, which is essential for merge-on-read semantics and efficient updates.
System field injection augments user data with internal metadata required for version control and change tracking. Sequence numbers establish a total ordering of operations, enabling conflict resolution in concurrent writes. Row kind markers distinguish between insertions, updates, and deletions in change data capture scenarios. The writer coordinates with the storage layer to flush buffered data periodically, generating data files with appropriate compression and encoding. Upon successful file generation, the writer produces manifest entries that describe the new files, which are then incorporated into the table's metadata through atomic manifest updates.
Usage
Apply table writing principles when implementing data ingestion pipelines, streaming table updates, or batch data loading systems. This pattern is essential when you need to maintain partitioning invariants, support upsert semantics, or coordinate distributed writers.
Theoretical Basis
The table writing pattern follows a multi-stage pipeline:
Stage 1: Key Extraction and Row Preparation
function prepareRow(inputRow, schema):
// Extract primary key fields
keyFields = schema.getPrimaryKeyFields()
key = extractFields(inputRow, keyFields)
// Extract value fields (non-key fields)
valueFields = schema.getNonKeyFields()
value = extractFields(inputRow, valueFields)
// Inject system fields
sequenceNumber = getCurrentSequenceNumber()
rowKind = determineRowKind(inputRow) // INSERT, UPDATE, DELETE
return KeyValue(
sequenceNumber: sequenceNumber,
kind: rowKind,
key: key,
value: value
)
Stage 2: Partition and Bucket Routing
function routeRow(row, partitionSpec, bucketSpec):
// Extract partition values
partitionValues = []
for each partField in partitionSpec.fields:
partitionValues.add(row.getField(partField))
partition = createPartition(partitionValues)
// Compute bucket assignment
bucketKey = extractBucketKey(row, bucketSpec.fields)
bucketId = hash(bucketKey) % bucketSpec.numBuckets
return (partition, bucketId)
Stage 3: Buffered Writing
class DataWriter:
buffers: map<(partition, bucket), RowBuffer>
function write(row):
preparedRow = prepareRow(row, schema)
(partition, bucket) = routeRow(preparedRow, partitionSpec, bucketSpec)
buffer = buffers.getOrCreate((partition, bucket))
buffer.append(preparedRow)
if buffer.size() >= FLUSH_THRESHOLD:
flush(partition, bucket)
function flush(partition, bucket):
buffer = buffers.get((partition, bucket))
rows = buffer.drain()
// Sort rows by key for optimal read performance
sortedRows = sort(rows, by: key)
// Write data file
dataFile = writeDataFile(partition, bucket, sortedRows)
// Generate manifest entry
manifestEntry = createManifestEntry(
kind: ADDED,
partition: partition,
filePath: dataFile.path,
fileSize: dataFile.size,
rowCount: sortedRows.length,
minKey: sortedRows.first().key,
maxKey: sortedRows.last().key,
statistics: computeStatistics(sortedRows)
)
pendingManifestEntries.add(manifestEntry)
buffer.clear()
Stage 4: Update and Delete Operations
function updateByRowId(rowId, newValues):
// Row ID encodes: partition + bucket + file + position
(partition, bucket, fileId, position) = decodeRowId(rowId)
// Read existing row
oldRow = readRowAtPosition(partition, bucket, fileId, position)
// Merge updates
updatedRow = mergeUpdates(oldRow, newValues)
// Write as update operation
write(KeyValue(
sequenceNumber: nextSequenceNumber(),
kind: UPDATE_AFTER,
key: extractKey(updatedRow),
value: extractValue(updatedRow)
))
// Optionally mark old position as deleted via deletion vector
markDeleted(partition, bucket, fileId, position)
Stage 5: Commit Coordination
function commit():
// Flush all pending buffers
for each (partition, bucket) in buffers:
flush(partition, bucket)
// Collect all manifest entries
allEntries = pendingManifestEntries.drain()
// Write manifest file
manifestFile = writeManifestFile(allEntries)
// Coordinate with manifest manager to commit
manifestManager.commit(manifestFile)
// Increment sequence number for next batch
incrementSequenceNumber()
Write Modes
- Append: Simple insertion without key checks, no updates or deletes
- Upsert: Insert new keys, update existing keys based on primary key matching
- Delete: Mark rows as deleted, optionally using deletion vectors
- Update by Row ID: Direct positional updates using encoded row identifiers