Implementation:Apache Paimon DataTypes Python
| Knowledge Sources | |
|---|---|
| Domains | Type System, Schema Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
DataTypes Python defines Paimon's complete data type system in Python, including type classes, parsing from JSON/SQL strings, and bidirectional conversion between Paimon types and PyArrow types (and Avro schemas).
Description
The module implements a comprehensive type hierarchy with `DataType` as the abstract base class. Concrete types include `AtomicType` (scalar types like INT, STRING, DECIMAL, TIMESTAMP, BLOB), `ArrayType`, `MultisetType`, `MapType`, and `RowType` (nested struct). Each type tracks nullability and provides `to_dict()` for serialization and `__str__()` for SQL-like representation. `DataField` represents a named, typed field with optional description and default value, used in schema definitions. `DataTypeParser` handles parsing from JSON dictionaries (used in schema files) or SQL-style strings (e.g., "DECIMAL(10,2) NOT NULL"), dispatching to appropriate type constructors with recursive handling for nested types. `AtomicInteger` provides thread-safe field ID generation with auto-increment. The `PyarrowFieldParser` class provides the critical bridge to PyArrow: `from_paimon_type()` converts Paimon types to PyArrow types with proper handling of precision for timestamps (s/ms/us/ns based on precision), decimals (precision/scale), binary types (fixed vs variable length), and time types; `to_paimon_type()` performs reverse conversion; `from_paimon_schema()` / `to_paimon_schema()` operate at the schema level; and `to_avro_type()` / `to_avro_schema()` generate Avro schemas for file format serialization. Special handling includes BLOB mapping to PyArrow large_binary, timestamp with local time zone mapping to timestamp with tz=UTC, and nested types mapping to PyArrow list/map/struct.
This foundational module is critical for the entire SDK, as every schema definition, file read, file write, and schema evolution operation depends on these type definitions and conversions. The PyArrow conversion layer enables seamless integration with the Python data ecosystem.
Usage
Data types are used throughout the SDK for schema definition, validation, serialization, and data conversion. Applications interact with them when defining schemas or examining table metadata.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/schema/data_types.py
Signature
class DataType(ABC):
def __init__(self, nullable: bool = True): ...
@abstractmethod
def to_dict(self) -> Dict[str, Any]: ...
@abstractmethod
def __str__(self) -> str: ...
@dataclass
class AtomicType(DataType):
type: str
def __init__(self, type: str, nullable: bool = True): ...
@dataclass
class ArrayType(DataType):
element: DataType
def __init__(self, nullable: bool, element_type: DataType): ...
@dataclass
class MapType(DataType):
key: DataType
value: DataType
def __init__(self, nullable: bool, key_type: DataType, value_type: DataType): ...
@dataclass
class RowType(DataType):
fields: List[DataField]
def __init__(self, nullable: bool, fields: List[DataField]): ...
@dataclass
class DataField:
id: int
name: str
type: DataType
description: Optional[str] = None
default_value: Optional[str] = None
class PyarrowFieldParser:
@staticmethod
def from_paimon_type(data_type: DataType) -> pyarrow.DataType: ...
@staticmethod
def to_paimon_type(pa_type: pyarrow.DataType, nullable: bool) -> DataType: ...
@staticmethod
def from_paimon_schema(data_fields: List[DataField]) -> pyarrow.Schema: ...
@staticmethod
def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]: ...
@staticmethod
def to_avro_schema(pyarrow_schema: Union[pyarrow.Schema, pyarrow.StructType],
name: str = "record") -> Dict[str, Any]: ...
Import
from pypaimon.schema.data_types import (
DataType, AtomicType, ArrayType, MapType, RowType,
DataField, PyarrowFieldParser, DataTypeParser
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| type_string | str | yes (for parsing) | SQL-style type string (e.g., "INT", "DECIMAL(10,2)") |
| json_data | Dict/str | yes (for parsing) | JSON representation of type |
| pa_type | pyarrow.DataType | yes (for conversion) | PyArrow type to convert |
Outputs
| Name | Type | Description |
|---|---|---|
| DataType | DataType subclass | Paimon data type object |
| pyarrow.DataType | pyarrow.DataType | PyArrow type object |
| Dict | Dict[str, Any] | JSON representation or Avro schema |
Usage Examples
Create and Parse Types
from pypaimon.schema.data_types import AtomicType, ArrayType, MapType, DataTypeParser
# Create atomic types
int_type = AtomicType("INT")
string_type = AtomicType("STRING", nullable=False)
decimal_type = AtomicType("DECIMAL(10,2)")
# Create complex types
array_type = ArrayType(nullable=True, element_type=AtomicType("STRING"))
map_type = MapType(
nullable=True,
key_type=AtomicType("STRING"),
value_type=AtomicType("INT")
)
# Parse from SQL string
parsed = DataTypeParser.parse_data_type("DECIMAL(10,2) NOT NULL")
print(parsed) # DECIMAL(10,2) NOT NULL
# Parse from JSON
json_type = {"type": "ARRAY", "element": "STRING"}
parsed = DataTypeParser.parse_data_type(json_type)
print(parsed) # ARRAY<STRING>
Define Schema
from pypaimon.schema.data_types import DataField, RowType
# Define table schema
fields = [
DataField(0, "user_id", AtomicType("INT"), "User identifier"),
DataField(1, "name", AtomicType("STRING"), "User name"),
DataField(2, "email", AtomicType("STRING", nullable=True), "User email"),
DataField(3, "balance", AtomicType("DECIMAL(10,2)"), "Account balance"),
DataField(4, "tags", ArrayType(True, AtomicType("STRING")), "User tags")
]
# Create row type
row_type = RowType(nullable=False, fields=fields)
print(row_type)
PyArrow Conversion
from pypaimon.schema.data_types import PyarrowFieldParser
import pyarrow as pa
# Convert Paimon field to PyArrow
paimon_field = DataField(0, "age", AtomicType("INT"))
pa_field = PyarrowFieldParser.from_paimon_field(paimon_field)
print(pa_field) # pyarrow.Field<age: int32>
# Convert PyArrow schema to Paimon
pa_schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("created_at", pa.timestamp('us'))
])
paimon_fields = PyarrowFieldParser.to_paimon_schema(pa_schema)
for field in paimon_fields:
print(f"{field.name}: {field.type}")
Schema Evolution
# Add new field with default value
new_field = DataField(
id=5,
name="status",
type=AtomicType("STRING"),
description="User status",
default_value="active"
)
fields.append(new_field)
Avro Schema Generation
# Generate Avro schema for file writing
pa_schema = PyarrowFieldParser.from_paimon_schema(fields)
avro_schema = PyarrowFieldParser.to_avro_schema(pa_schema, name="UserRecord")
import json
print(json.dumps(avro_schema, indent=2))
# {
# "type": "record",
# "name": "UserRecord",
# "namespace": "org.apache.paimon.avro.generated",
# "fields": [...]
# }