Implementation:Apache Paimon SpecialFields Python
| Knowledge Sources | |
|---|---|
| Domains | Schema Management, System Fields |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
SpecialFields defines system fields used internally by Paimon for sequence numbers, row kinds, and row IDs with reserved field IDs.
Description
The SpecialFields class provides definitions for Paimon's internal system fields that are used for tracking row metadata. These fields use reserved high-numbered field IDs (near Integer.MAX_VALUE) to avoid conflicts with user-defined fields.
The three system fields are: _SEQUENCE_NUMBER (field ID 2147483646) for ordering operations, _VALUE_KIND (field ID 2147483645) for tracking row change types, and _ROW_ID (field ID 2147483642) for uniquely identifying rows in row tracking scenarios.
The class provides utility methods to check if a field name is a system field, find system fields in a field list, and add row tracking fields to a schema. The row_type_with_row_tracking method is used when enabling row-level update and delete operations by adding _ROW_ID and optionally nullable _SEQUENCE_NUMBER fields.
Usage
Use SpecialFields when implementing row tracking features, working with internal row representations, or when you need to distinguish between user fields and system fields in schema operations.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/table/special_fields.py
Signature
class SpecialFields:
"""Special fields in a RowType with specific field ids."""
SEQUENCE_NUMBER = DataField(2147483646, "_SEQUENCE_NUMBER", AtomicType("BIGINT", nullable=False))
VALUE_KIND = DataField(2147483645, "_VALUE_KIND", AtomicType("TINYINT", nullable=False))
ROW_ID = DataField(2147483642, "_ROW_ID", AtomicType("BIGINT", nullable=False))
SYSTEM_FIELD_NAMES = {
'_SEQUENCE_NUMBER',
'_VALUE_KIND',
'_ROW_ID'
}
@staticmethod
def is_system_field(field_name: str) -> bool:
"""Check if a field is a system field."""
@staticmethod
def find_system_fields(read_fields: List[DataField]) -> dict:
"""Find system fields in read fields and return a mapping of field name to index."""
@staticmethod
def row_type_with_row_tracking(
table_fields: List[DataField],
sequence_number_nullable: bool = False
) -> List[DataField]:
"""Add row tracking fields."""
Import
from pypaimon.table.special_fields import SpecialFields
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| field_name | str | Yes (for is_system_field) | Field name to check |
| read_fields | List[DataField] | Yes (for find_system_fields) | List of fields to search |
| table_fields | List[DataField] | Yes (for row_type_with_row_tracking) | Original table fields |
| sequence_number_nullable | bool | No | Whether sequence number should be nullable (default False) |
Outputs
| Name | Type | Description |
|---|---|---|
| is_system | bool | True if field is a system field |
| system_fields | dict | Mapping of system field name to field index |
| fields_with_tracking | List[DataField] | Fields with row tracking columns added |
Usage Examples
from pypaimon.table.special_fields import SpecialFields
from pypaimon.schema.data_types import DataField, AtomicType
# Check if field is system field
if SpecialFields.is_system_field("_SEQUENCE_NUMBER"):
print("This is a system field")
if not SpecialFields.is_system_field("user_id"):
print("This is a user field")
# Access system field definitions
seq_num_field = SpecialFields.SEQUENCE_NUMBER
print(f"Sequence number field ID: {seq_num_field.id}")
print(f"Sequence number type: {seq_num_field.type}")
row_id_field = SpecialFields.ROW_ID
value_kind_field = SpecialFields.VALUE_KIND
# Find system fields in a field list
fields = [
DataField(0, "id", AtomicType("BIGINT")),
DataField(1, "name", AtomicType("STRING")),
SpecialFields.ROW_ID,
SpecialFields.SEQUENCE_NUMBER
]
system_fields = SpecialFields.find_system_fields(fields)
print(f"System fields at indices: {system_fields}")
# Output: {'_ROW_ID': 2, '_SEQUENCE_NUMBER': 3}
# Add row tracking fields to schema
table_fields = [
DataField(0, "id", AtomicType("BIGINT")),
DataField(1, "name", AtomicType("STRING")),
DataField(2, "age", AtomicType("INT"))
]
# With non-nullable sequence number
fields_with_tracking = SpecialFields.row_type_with_row_tracking(table_fields)
print(f"Total fields: {len(fields_with_tracking)}") # 5 (3 + 2)
# With nullable sequence number
fields_nullable = SpecialFields.row_type_with_row_tracking(
table_fields,
sequence_number_nullable=True
)
# Check for field name conflicts
try:
conflicting_fields = [
DataField(0, "_ROW_ID", AtomicType("BIGINT")), # Conflicts!
DataField(1, "name", AtomicType("STRING"))
]
SpecialFields.row_type_with_row_tracking(conflicting_fields)
except ValueError as e:
print(f"Error: {e}")
# Filter out system fields from user fields
all_fields = [
DataField(0, "id", AtomicType("BIGINT")),
DataField(1, "name", AtomicType("STRING")),
SpecialFields.ROW_ID,
SpecialFields.SEQUENCE_NUMBER
]
user_fields = [f for f in all_fields if not SpecialFields.is_system_field(f.name)]
print(f"User fields: {[f.name for f in user_fields]}")