Implementation:Apache Paimon RowKeyExtractor
| Knowledge Sources | |
|---|---|
| Domains | Bucketing, Partitioning |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
RowKeyExtractor extracts partition and bucket information from PyArrow data with support for multiple bucket modes (fixed, unaware, dynamic, postpone).
Description
The RowKeyExtractor module provides a family of classes for extracting partition values and bucket numbers from PyArrow RecordBatch data. The base RowKeyExtractor class defines the interface and handles partition extraction, while concrete implementations handle different bucket modes.
FixedBucketRowKeyExtractor computes bucket numbers using MurmurHash on bucket key fields with configurable bucket counts. It supports custom bucket keys or defaults to non-partition primary keys. UnawareBucketRowKeyExtractor assigns all rows to bucket 0 for tables without bucketing. DynamicBucketRowKeyExtractor prevents bucket extraction for dynamic bucket mode. PostponeBucketRowKeyExtractor assigns a special value (-2) for postponed bucketing.
The hash computation uses MurmurHash3 algorithm on binary row representation for consistent bucket assignment across different platforms. The implementation is optimized for batch processing, extracting partitions and buckets for all rows in a RecordBatch at once.
Usage
Use RowKeyExtractor when writing data to Paimon tables, implementing custom writers, or when you need to determine which partition and bucket a row belongs to based on table schema and bucketing configuration.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/write/row_key_extractor.py
Signature
class RowKeyExtractor(ABC):
"""Base class for extracting partition and bucket information from PyArrow data."""
def __init__(self, table_schema: TableSchema):
"""Initialize with table schema."""
def extract_partition_bucket_batch(self, data: pa.RecordBatch) -> Tuple[List[Tuple], List[int]]:
"""Extract partitions and buckets for all rows in batch."""
@abstractmethod
def _extract_buckets_batch(self, table: pa.RecordBatch) -> List[int]:
"""Extract bucket numbers for all rows. Must be implemented by subclasses."""
class FixedBucketRowKeyExtractor(RowKeyExtractor):
"""Fixed bucket mode extractor with configurable number of buckets."""
def __init__(self, table_schema: TableSchema):
"""Initialize with table schema and bucket configuration."""
class UnawareBucketRowKeyExtractor(RowKeyExtractor):
"""Extractor for unaware bucket mode (bucket = -1, no primary keys)."""
def __init__(self, table_schema: TableSchema):
"""Initialize for unaware bucket mode."""
class DynamicBucketRowKeyExtractor(RowKeyExtractor):
"""Row key extractor for dynamic bucket mode."""
def __init__(self, table_schema: TableSchema):
"""Initialize for dynamic bucket mode."""
class PostponeBucketRowKeyExtractor(RowKeyExtractor):
"""Extractor for postpone bucket mode (bucket = -2)."""
def __init__(self, table_schema: TableSchema):
"""Initialize for postpone bucket mode."""
Import
from pypaimon.write.row_key_extractor import (
RowKeyExtractor,
FixedBucketRowKeyExtractor,
UnawareBucketRowKeyExtractor,
DynamicBucketRowKeyExtractor,
PostponeBucketRowKeyExtractor
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table_schema | TableSchema | Yes | Table schema with partition and bucket configuration |
| data | pa.RecordBatch | Yes | PyArrow RecordBatch to extract from |
Outputs
| Name | Type | Description |
|---|---|---|
| partitions | List[Tuple] | List of partition value tuples, one per row |
| buckets | List[int] | List of bucket numbers, one per row |
Usage Examples
from pypaimon.write.row_key_extractor import (
FixedBucketRowKeyExtractor,
UnawareBucketRowKeyExtractor
)
import pyarrow as pa
# Fixed bucket mode
table_schema = TableSchema(
fields=[...],
partition_keys=["date", "region"],
primary_keys=["date", "region", "id"],
options={"bucket": "4"}
)
extractor = FixedBucketRowKeyExtractor(table_schema)
# Create sample data
data = pa.record_batch({
"date": ["2024-01-01", "2024-01-01", "2024-01-02"],
"region": ["us", "us", "eu"],
"id": [1, 2, 3],
"value": [100, 200, 300]
})
# Extract partitions and buckets
partitions, buckets = extractor.extract_partition_bucket_batch(data)
print("Partitions:", partitions)
# [("2024-01-01", "us"), ("2024-01-01", "us"), ("2024-01-02", "eu")]
print("Buckets:", buckets)
# [0, 2, 1] - bucket numbers based on hash of id field
# Unaware bucket mode (no bucketing)
unaware_schema = TableSchema(
fields=[...],
partition_keys=["date"],
primary_keys=[],
options={"bucket": "-1"}
)
unaware_extractor = UnawareBucketRowKeyExtractor(unaware_schema)
partitions, buckets = unaware_extractor.extract_partition_bucket_batch(data)
print("Buckets (unaware):", buckets)
# [0, 0, 0] - all rows go to bucket 0
# Custom bucket keys
custom_schema = TableSchema(
fields=[...],
partition_keys=["date"],
primary_keys=["date", "id"],
options={"bucket": "8", "bucket-key": "id"}
)
custom_extractor = FixedBucketRowKeyExtractor(custom_schema)
partitions, buckets = custom_extractor.extract_partition_bucket_batch(data)
# Use in writer
class MyWriter:
def __init__(self, table_schema):
self.extractor = FixedBucketRowKeyExtractor(table_schema)
def write(self, batch: pa.RecordBatch):
partitions, buckets = self.extractor.extract_partition_bucket_batch(batch)
# Group by (partition, bucket)
for i in range(batch.num_rows):
partition = partitions[i]
bucket = buckets[i]
# Write row to appropriate file
self.write_to_file(partition, bucket, batch.slice(i, 1))