Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon DataTypes Python

From Leeroopedia
Revision as of 14:20, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Paimon_DataTypes_Python.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Type System, Schema Management
Last Updated 2026-02-08 00:00 GMT

Overview

DataTypes Python defines Paimon's complete data type system in Python, including type classes, parsing from JSON/SQL strings, and bidirectional conversion between Paimon types and PyArrow types (and Avro schemas).

Description

The module implements a comprehensive type hierarchy with `DataType` as the abstract base class. Concrete types include `AtomicType` (scalar types like INT, STRING, DECIMAL, TIMESTAMP, BLOB), `ArrayType`, `MultisetType`, `MapType`, and `RowType` (nested struct). Each type tracks nullability and provides `to_dict()` for serialization and `__str__()` for SQL-like representation. `DataField` represents a named, typed field with optional description and default value, used in schema definitions. `DataTypeParser` handles parsing from JSON dictionaries (used in schema files) or SQL-style strings (e.g., "DECIMAL(10,2) NOT NULL"), dispatching to appropriate type constructors with recursive handling for nested types. `AtomicInteger` provides thread-safe field ID generation with auto-increment. The `PyarrowFieldParser` class provides the critical bridge to PyArrow: `from_paimon_type()` converts Paimon types to PyArrow types with proper handling of precision for timestamps (s/ms/us/ns based on precision), decimals (precision/scale), binary types (fixed vs variable length), and time types; `to_paimon_type()` performs reverse conversion; `from_paimon_schema()` / `to_paimon_schema()` operate at the schema level; and `to_avro_type()` / `to_avro_schema()` generate Avro schemas for file format serialization. Special handling includes BLOB mapping to PyArrow large_binary, timestamp with local time zone mapping to timestamp with tz=UTC, and nested types mapping to PyArrow list/map/struct.

This foundational module is critical for the entire SDK, as every schema definition, file read, file write, and schema evolution operation depends on these type definitions and conversions. The PyArrow conversion layer enables seamless integration with the Python data ecosystem.

Usage

Data types are used throughout the SDK for schema definition, validation, serialization, and data conversion. Applications interact with them when defining schemas or examining table metadata.

Code Reference

Source Location

Signature

class DataType(ABC):
    def __init__(self, nullable: bool = True): ...
    @abstractmethod
    def to_dict(self) -> Dict[str, Any]: ...
    @abstractmethod
    def __str__(self) -> str: ...

@dataclass
class AtomicType(DataType):
    type: str
    def __init__(self, type: str, nullable: bool = True): ...

@dataclass
class ArrayType(DataType):
    element: DataType
    def __init__(self, nullable: bool, element_type: DataType): ...

@dataclass
class MapType(DataType):
    key: DataType
    value: DataType
    def __init__(self, nullable: bool, key_type: DataType, value_type: DataType): ...

@dataclass
class RowType(DataType):
    fields: List[DataField]
    def __init__(self, nullable: bool, fields: List[DataField]): ...

@dataclass
class DataField:
    id: int
    name: str
    type: DataType
    description: Optional[str] = None
    default_value: Optional[str] = None

class PyarrowFieldParser:
    @staticmethod
    def from_paimon_type(data_type: DataType) -> pyarrow.DataType: ...
    @staticmethod
    def to_paimon_type(pa_type: pyarrow.DataType, nullable: bool) -> DataType: ...
    @staticmethod
    def from_paimon_schema(data_fields: List[DataField]) -> pyarrow.Schema: ...
    @staticmethod
    def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]: ...
    @staticmethod
    def to_avro_schema(pyarrow_schema: Union[pyarrow.Schema, pyarrow.StructType],
                       name: str = "record") -> Dict[str, Any]: ...

Import

from pypaimon.schema.data_types import (
    DataType, AtomicType, ArrayType, MapType, RowType,
    DataField, PyarrowFieldParser, DataTypeParser
)

I/O Contract

Inputs

Name Type Required Description
type_string str yes (for parsing) SQL-style type string (e.g., "INT", "DECIMAL(10,2)")
json_data Dict/str yes (for parsing) JSON representation of type
pa_type pyarrow.DataType yes (for conversion) PyArrow type to convert

Outputs

Name Type Description
DataType DataType subclass Paimon data type object
pyarrow.DataType pyarrow.DataType PyArrow type object
Dict Dict[str, Any] JSON representation or Avro schema

Usage Examples

Create and Parse Types

from pypaimon.schema.data_types import AtomicType, ArrayType, MapType, DataTypeParser

# Create atomic types
int_type = AtomicType("INT")
string_type = AtomicType("STRING", nullable=False)
decimal_type = AtomicType("DECIMAL(10,2)")

# Create complex types
array_type = ArrayType(nullable=True, element_type=AtomicType("STRING"))
map_type = MapType(
    nullable=True,
    key_type=AtomicType("STRING"),
    value_type=AtomicType("INT")
)

# Parse from SQL string
parsed = DataTypeParser.parse_data_type("DECIMAL(10,2) NOT NULL")
print(parsed)  # DECIMAL(10,2) NOT NULL

# Parse from JSON
json_type = {"type": "ARRAY", "element": "STRING"}
parsed = DataTypeParser.parse_data_type(json_type)
print(parsed)  # ARRAY<STRING>

Define Schema

from pypaimon.schema.data_types import DataField, RowType

# Define table schema
fields = [
    DataField(0, "user_id", AtomicType("INT"), "User identifier"),
    DataField(1, "name", AtomicType("STRING"), "User name"),
    DataField(2, "email", AtomicType("STRING", nullable=True), "User email"),
    DataField(3, "balance", AtomicType("DECIMAL(10,2)"), "Account balance"),
    DataField(4, "tags", ArrayType(True, AtomicType("STRING")), "User tags")
]

# Create row type
row_type = RowType(nullable=False, fields=fields)
print(row_type)

PyArrow Conversion

from pypaimon.schema.data_types import PyarrowFieldParser
import pyarrow as pa

# Convert Paimon field to PyArrow
paimon_field = DataField(0, "age", AtomicType("INT"))
pa_field = PyarrowFieldParser.from_paimon_field(paimon_field)
print(pa_field)  # pyarrow.Field<age: int32>

# Convert PyArrow schema to Paimon
pa_schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("name", pa.string()),
    pa.field("created_at", pa.timestamp('us'))
])
paimon_fields = PyarrowFieldParser.to_paimon_schema(pa_schema)

for field in paimon_fields:
    print(f"{field.name}: {field.type}")

Schema Evolution

# Add new field with default value
new_field = DataField(
    id=5,
    name="status",
    type=AtomicType("STRING"),
    description="User status",
    default_value="active"
)

fields.append(new_field)

Avro Schema Generation

# Generate Avro schema for file writing
pa_schema = PyarrowFieldParser.from_paimon_schema(fields)
avro_schema = PyarrowFieldParser.to_avro_schema(pa_schema, name="UserRecord")

import json
print(json.dumps(avro_schema, indent=2))
# {
#   "type": "record",
#   "name": "UserRecord",
#   "namespace": "org.apache.paimon.avro.generated",
#   "fields": [...]
# }

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment