Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Apache Paimon Table Writing

From Leeroopedia


Knowledge Sources
Domains Data_Writing, Storage
Last Updated 2026-02-08 00:00 GMT

Overview

Table writing orchestrates the ingestion of data rows into partitioned, bucketed storage through key extraction, routing, system field injection, and coordinated file generation.

Description

The table writing principle addresses the complexity of transforming a stream of data rows into properly organized storage files that respect partitioning schemes, bucketing strategies, and table update semantics. Writers must extract routing information from each row, direct it to the appropriate partition and bucket, inject system-level metadata fields, and coordinate the generation of data files with proper statistics and manifest entries. This multi-stage process ensures data lands in the correct location while maintaining consistency guarantees.

Key extraction forms the foundation of the writing process. For tables with primary keys, writers must identify which fields constitute the key and extract them for routing and deduplication purposes. The key extractor separates rows into key and value components, enabling the system to implement update and delete operations efficiently. Bucketing distributes data within partitions based on hash values of bucketing columns, ensuring that rows with the same bucket key always land in the same bucket, which is essential for merge-on-read semantics and efficient updates.

System field injection augments user data with internal metadata required for version control and change tracking. Sequence numbers establish a total ordering of operations, enabling conflict resolution in concurrent writes. Row kind markers distinguish between insertions, updates, and deletions in change data capture scenarios. The writer coordinates with the storage layer to flush buffered data periodically, generating data files with appropriate compression and encoding. Upon successful file generation, the writer produces manifest entries that describe the new files, which are then incorporated into the table's metadata through atomic manifest updates.

Usage

Apply table writing principles when implementing data ingestion pipelines, streaming table updates, or batch data loading systems. This pattern is essential when you need to maintain partitioning invariants, support upsert semantics, or coordinate distributed writers.

Theoretical Basis

The table writing pattern follows a multi-stage pipeline:

Stage 1: Key Extraction and Row Preparation

function prepareRow(inputRow, schema):
    // Extract primary key fields
    keyFields = schema.getPrimaryKeyFields()
    key = extractFields(inputRow, keyFields)

    // Extract value fields (non-key fields)
    valueFields = schema.getNonKeyFields()
    value = extractFields(inputRow, valueFields)

    // Inject system fields
    sequenceNumber = getCurrentSequenceNumber()
    rowKind = determineRowKind(inputRow)  // INSERT, UPDATE, DELETE

    return KeyValue(
        sequenceNumber: sequenceNumber,
        kind: rowKind,
        key: key,
        value: value
    )

Stage 2: Partition and Bucket Routing

function routeRow(row, partitionSpec, bucketSpec):
    // Extract partition values
    partitionValues = []
    for each partField in partitionSpec.fields:
        partitionValues.add(row.getField(partField))

    partition = createPartition(partitionValues)

    // Compute bucket assignment
    bucketKey = extractBucketKey(row, bucketSpec.fields)
    bucketId = hash(bucketKey) % bucketSpec.numBuckets

    return (partition, bucketId)

Stage 3: Buffered Writing

class DataWriter:
    buffers: map<(partition, bucket), RowBuffer>

    function write(row):
        preparedRow = prepareRow(row, schema)
        (partition, bucket) = routeRow(preparedRow, partitionSpec, bucketSpec)

        buffer = buffers.getOrCreate((partition, bucket))
        buffer.append(preparedRow)

        if buffer.size() >= FLUSH_THRESHOLD:
            flush(partition, bucket)

    function flush(partition, bucket):
        buffer = buffers.get((partition, bucket))
        rows = buffer.drain()

        // Sort rows by key for optimal read performance
        sortedRows = sort(rows, by: key)

        // Write data file
        dataFile = writeDataFile(partition, bucket, sortedRows)

        // Generate manifest entry
        manifestEntry = createManifestEntry(
            kind: ADDED,
            partition: partition,
            filePath: dataFile.path,
            fileSize: dataFile.size,
            rowCount: sortedRows.length,
            minKey: sortedRows.first().key,
            maxKey: sortedRows.last().key,
            statistics: computeStatistics(sortedRows)
        )

        pendingManifestEntries.add(manifestEntry)
        buffer.clear()

Stage 4: Update and Delete Operations

function updateByRowId(rowId, newValues):
    // Row ID encodes: partition + bucket + file + position
    (partition, bucket, fileId, position) = decodeRowId(rowId)

    // Read existing row
    oldRow = readRowAtPosition(partition, bucket, fileId, position)

    // Merge updates
    updatedRow = mergeUpdates(oldRow, newValues)

    // Write as update operation
    write(KeyValue(
        sequenceNumber: nextSequenceNumber(),
        kind: UPDATE_AFTER,
        key: extractKey(updatedRow),
        value: extractValue(updatedRow)
    ))

    // Optionally mark old position as deleted via deletion vector
    markDeleted(partition, bucket, fileId, position)

Stage 5: Commit Coordination

function commit():
    // Flush all pending buffers
    for each (partition, bucket) in buffers:
        flush(partition, bucket)

    // Collect all manifest entries
    allEntries = pendingManifestEntries.drain()

    // Write manifest file
    manifestFile = writeManifestFile(allEntries)

    // Coordinate with manifest manager to commit
    manifestManager.commit(manifestFile)

    // Increment sequence number for next batch
    incrementSequenceNumber()

Write Modes

  • Append: Simple insertion without key checks, no updates or deletes
  • Upsert: Insert new keys, update existing keys based on primary key matching
  • Delete: Mark rows as deleted, optionally using deletion vectors
  • Update by Row ID: Direct positional updates using encoded row identifiers

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment