Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon RowKeyExtractor

From Leeroopedia


Knowledge Sources
Domains Bucketing, Partitioning
Last Updated 2026-02-08 00:00 GMT

Overview

RowKeyExtractor extracts partition and bucket information from PyArrow data with support for multiple bucket modes (fixed, unaware, dynamic, postpone).

Description

The RowKeyExtractor module provides a family of classes for extracting partition values and bucket numbers from PyArrow RecordBatch data. The base RowKeyExtractor class defines the interface and handles partition extraction, while concrete implementations handle different bucket modes.

FixedBucketRowKeyExtractor computes bucket numbers using MurmurHash on bucket key fields with configurable bucket counts. It supports custom bucket keys or defaults to non-partition primary keys. UnawareBucketRowKeyExtractor assigns all rows to bucket 0 for tables without bucketing. DynamicBucketRowKeyExtractor prevents bucket extraction for dynamic bucket mode. PostponeBucketRowKeyExtractor assigns a special value (-2) for postponed bucketing.

The hash computation uses MurmurHash3 algorithm on binary row representation for consistent bucket assignment across different platforms. The implementation is optimized for batch processing, extracting partitions and buckets for all rows in a RecordBatch at once.

Usage

Use RowKeyExtractor when writing data to Paimon tables, implementing custom writers, or when you need to determine which partition and bucket a row belongs to based on table schema and bucketing configuration.

Code Reference

Source Location

Signature

class RowKeyExtractor(ABC):
    """Base class for extracting partition and bucket information from PyArrow data."""

    def __init__(self, table_schema: TableSchema):
        """Initialize with table schema."""

    def extract_partition_bucket_batch(self, data: pa.RecordBatch) -> Tuple[List[Tuple], List[int]]:
        """Extract partitions and buckets for all rows in batch."""

    @abstractmethod
    def _extract_buckets_batch(self, table: pa.RecordBatch) -> List[int]:
        """Extract bucket numbers for all rows. Must be implemented by subclasses."""


class FixedBucketRowKeyExtractor(RowKeyExtractor):
    """Fixed bucket mode extractor with configurable number of buckets."""

    def __init__(self, table_schema: TableSchema):
        """Initialize with table schema and bucket configuration."""


class UnawareBucketRowKeyExtractor(RowKeyExtractor):
    """Extractor for unaware bucket mode (bucket = -1, no primary keys)."""

    def __init__(self, table_schema: TableSchema):
        """Initialize for unaware bucket mode."""


class DynamicBucketRowKeyExtractor(RowKeyExtractor):
    """Row key extractor for dynamic bucket mode."""

    def __init__(self, table_schema: TableSchema):
        """Initialize for dynamic bucket mode."""


class PostponeBucketRowKeyExtractor(RowKeyExtractor):
    """Extractor for postpone bucket mode (bucket = -2)."""

    def __init__(self, table_schema: TableSchema):
        """Initialize for postpone bucket mode."""

Import

from pypaimon.write.row_key_extractor import (
    RowKeyExtractor,
    FixedBucketRowKeyExtractor,
    UnawareBucketRowKeyExtractor,
    DynamicBucketRowKeyExtractor,
    PostponeBucketRowKeyExtractor
)

I/O Contract

Inputs

Name Type Required Description
table_schema TableSchema Yes Table schema with partition and bucket configuration
data pa.RecordBatch Yes PyArrow RecordBatch to extract from

Outputs

Name Type Description
partitions List[Tuple] List of partition value tuples, one per row
buckets List[int] List of bucket numbers, one per row

Usage Examples

from pypaimon.write.row_key_extractor import (
    FixedBucketRowKeyExtractor,
    UnawareBucketRowKeyExtractor
)
import pyarrow as pa

# Fixed bucket mode
table_schema = TableSchema(
    fields=[...],
    partition_keys=["date", "region"],
    primary_keys=["date", "region", "id"],
    options={"bucket": "4"}
)

extractor = FixedBucketRowKeyExtractor(table_schema)

# Create sample data
data = pa.record_batch({
    "date": ["2024-01-01", "2024-01-01", "2024-01-02"],
    "region": ["us", "us", "eu"],
    "id": [1, 2, 3],
    "value": [100, 200, 300]
})

# Extract partitions and buckets
partitions, buckets = extractor.extract_partition_bucket_batch(data)

print("Partitions:", partitions)
# [("2024-01-01", "us"), ("2024-01-01", "us"), ("2024-01-02", "eu")]

print("Buckets:", buckets)
# [0, 2, 1] - bucket numbers based on hash of id field

# Unaware bucket mode (no bucketing)
unaware_schema = TableSchema(
    fields=[...],
    partition_keys=["date"],
    primary_keys=[],
    options={"bucket": "-1"}
)

unaware_extractor = UnawareBucketRowKeyExtractor(unaware_schema)
partitions, buckets = unaware_extractor.extract_partition_bucket_batch(data)

print("Buckets (unaware):", buckets)
# [0, 0, 0] - all rows go to bucket 0

# Custom bucket keys
custom_schema = TableSchema(
    fields=[...],
    partition_keys=["date"],
    primary_keys=["date", "id"],
    options={"bucket": "8", "bucket-key": "id"}
)

custom_extractor = FixedBucketRowKeyExtractor(custom_schema)
partitions, buckets = custom_extractor.extract_partition_bucket_batch(data)

# Use in writer
class MyWriter:
    def __init__(self, table_schema):
        self.extractor = FixedBucketRowKeyExtractor(table_schema)

    def write(self, batch: pa.RecordBatch):
        partitions, buckets = self.extractor.extract_partition_bucket_batch(batch)

        # Group by (partition, bucket)
        for i in range(batch.num_rows):
            partition = partitions[i]
            bucket = buckets[i]
            # Write row to appropriate file
            self.write_to_file(partition, bucket, batch.slice(i, 1))

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment