Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon SchemaManager

From Leeroopedia


Knowledge Sources
Domains Schema Management, Metadata Storage
Last Updated 2026-02-08 00:00 GMT

Overview

SchemaManager manages the lifecycle of table schemas on the filesystem, including creation, versioned persistence, retrieval with caching, and atomic schema evolution through change application.

Description

SchemaManager stores schemas as versioned files (`schema-0`, `schema-1`, etc.) under `{table_path}/schema/`, where each version is immutable and represents a complete snapshot of the table schema at a point in time. The `latest()` method lists all schema files, identifies the highest version number, and returns that schema with caching. `create_table()` validates that no schema exists, builds a `TableSchema` from a `Schema` with id=0, and commits it atomically via `FileIO.try_to_write_atomic()` (which uses filesystem-level atomic operations to prevent race conditions). `get_schema()` retrieves a specific version by ID with in-memory caching in the `schema_cache` dictionary. The powerful `commit_changes()` method implements schema evolution: it loads the latest schema, applies a list of `SchemaChange` objects via `_generate_table_schema()` (which handles AddColumn, DropColumn, RenameColumn, UpdateColumnType, UpdateColumnNullability, UpdateColumnComment, UpdateColumnPosition, SetOption, RemoveOption, UpdateComment), increments the schema ID, and attempts to commit with retry-on-conflict semantics. Change validation ensures critical constraints: cannot drop partition keys or primary keys, cannot rename partition keys, cannot update primary key types, new columns must be nullable. Column renames propagate to bucket key and sequence field options. Field IDs are managed via `AtomicInteger` for thread-safe auto-increment during field additions. The manager uses deep copying of field types to prevent accidental mutation.

This architecture implements Paimon's schema versioning and evolution semantics in pure Python, enabling the Python SDK to manage table schemas without requiring a Java runtime. The atomic write pattern ensures consistency even under concurrent access from multiple writers.

Usage

SchemaManager is used internally by filesystem-based catalogs and table implementations. Applications rarely interact with it directly, instead using catalog-level schema operations.

Code Reference

Source Location

Signature

class SchemaManager:
    def __init__(self, file_io: FileIO, table_path: str): ...

    def latest(self) -> Optional[TableSchema]: ...
    def create_table(self, schema: Schema) -> TableSchema: ...
    def commit(self, new_schema: TableSchema) -> bool: ...
    def get_schema(self, schema_id: int) -> Optional[TableSchema]: ...
    def commit_changes(self, changes: List[SchemaChange]) -> TableSchema: ...

    def _generate_table_schema(self, old_table_schema: TableSchema,
                               changes: List[SchemaChange]) -> TableSchema: ...
    def _list_versioned_files(self) -> List[int]: ...
    def _to_schema_path(self, schema_id: int) -> str: ...

    @staticmethod
    def _apply_not_nested_column_rename(columns: List[str],
                                        rename_mappings: dict) -> List[str]: ...
    @staticmethod
    def _apply_rename_columns_to_options(options: dict,
                                         rename_mappings: dict) -> dict: ...

Import

from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.common.file_io import LocalFileIO

I/O Contract

Inputs

Name Type Required Description
file_io FileIO yes File I/O abstraction
table_path str yes Path to table directory
schema Schema yes (for create) Initial schema definition
changes List[SchemaChange] yes (for evolution) Schema changes to apply

Outputs

Name Type Description
TableSchema TableSchema Current or newly created schema
success bool Whether commit succeeded (for atomic operations)

Usage Examples

Create Table

from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.schema import Schema
from pypaimon.schema.data_types import DataField, AtomicType
from pypaimon.common.file_io import LocalFileIO

# Initialize manager
file_io = LocalFileIO()
manager = SchemaManager(file_io, "/path/to/table")

# Define schema
schema = Schema(
    fields=[
        DataField(0, "user_id", AtomicType("INT")),
        DataField(1, "name", AtomicType("STRING")),
        DataField(2, "email", AtomicType("STRING"))
    ],
    partition_keys=["user_id"],
    primary_keys=["user_id"],
    options={"bucket": "4"}
)

# Create table
table_schema = manager.create_table(schema)
print(f"Created schema version {table_schema.id}")

Get Latest Schema

# Get latest schema
latest = manager.latest()
if latest:
    print(f"Latest schema version: {latest.id}")
    print(f"Fields: {[f.name for f in latest.fields]}")
else:
    print("No schema found")

# Get specific version
schema_v0 = manager.get_schema(0)
schema_v1 = manager.get_schema(1)

Add Column

from pypaimon.schema.schema_change import AddColumn

# Add new column
changes = [
    AddColumn(
        field_names=["age"],
        data_type=AtomicType("INT", nullable=True),
        comment="User age"
    )
]

new_schema = manager.commit_changes(changes)
print(f"Schema updated to version {new_schema.id}")

Rename Column

from pypaimon.schema.schema_change import RenameColumn

# Rename column
changes = [
    RenameColumn(field_names=["email"], new_name="email_address")
]

new_schema = manager.commit_changes(changes)

Drop Column

from pypaimon.schema.schema_change import DropColumn

# Drop column (cannot drop partition or primary key)
changes = [
    DropColumn(field_names=["age"])
]

new_schema = manager.commit_changes(changes)

Update Column Type

from pypaimon.schema.schema_change import UpdateColumnType

# Update column type
changes = [
    UpdateColumnType(
        field_names=["name"],
        new_data_type=AtomicType("VARCHAR(100)"),
        keep_nullability=True
    )
]

new_schema = manager.commit_changes(changes)

Multiple Changes

from pypaimon.schema.schema_change import (
    AddColumn, UpdateColumnComment, SetOption
)

# Apply multiple changes atomically
changes = [
    AddColumn(field_names=["status"], data_type=AtomicType("STRING"),
              comment="User status"),
    UpdateColumnComment(field_names=["name"], new_comment="Full name"),
    SetOption(key="compaction.max-file-num", value="50")
]

new_schema = manager.commit_changes(changes)
print(f"Applied {len(changes)} changes, new version: {new_schema.id}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment