Implementation:Apache Paimon SchemaManager
| Knowledge Sources | |
|---|---|
| Domains | Schema Management, Metadata Storage |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
SchemaManager manages the lifecycle of table schemas on the filesystem, including creation, versioned persistence, retrieval with caching, and atomic schema evolution through change application.
Description
SchemaManager stores schemas as versioned files (`schema-0`, `schema-1`, etc.) under `{table_path}/schema/`, where each version is immutable and represents a complete snapshot of the table schema at a point in time. The `latest()` method lists all schema files, identifies the highest version number, and returns that schema with caching. `create_table()` validates that no schema exists, builds a `TableSchema` from a `Schema` with id=0, and commits it atomically via `FileIO.try_to_write_atomic()` (which uses filesystem-level atomic operations to prevent race conditions). `get_schema()` retrieves a specific version by ID with in-memory caching in the `schema_cache` dictionary. The powerful `commit_changes()` method implements schema evolution: it loads the latest schema, applies a list of `SchemaChange` objects via `_generate_table_schema()` (which handles AddColumn, DropColumn, RenameColumn, UpdateColumnType, UpdateColumnNullability, UpdateColumnComment, UpdateColumnPosition, SetOption, RemoveOption, UpdateComment), increments the schema ID, and attempts to commit with retry-on-conflict semantics. Change validation ensures critical constraints: cannot drop partition keys or primary keys, cannot rename partition keys, cannot update primary key types, new columns must be nullable. Column renames propagate to bucket key and sequence field options. Field IDs are managed via `AtomicInteger` for thread-safe auto-increment during field additions. The manager uses deep copying of field types to prevent accidental mutation.
This architecture implements Paimon's schema versioning and evolution semantics in pure Python, enabling the Python SDK to manage table schemas without requiring a Java runtime. The atomic write pattern ensures consistency even under concurrent access from multiple writers.
Usage
SchemaManager is used internally by filesystem-based catalogs and table implementations. Applications rarely interact with it directly, instead using catalog-level schema operations.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/schema/schema_manager.py
Signature
class SchemaManager:
def __init__(self, file_io: FileIO, table_path: str): ...
def latest(self) -> Optional[TableSchema]: ...
def create_table(self, schema: Schema) -> TableSchema: ...
def commit(self, new_schema: TableSchema) -> bool: ...
def get_schema(self, schema_id: int) -> Optional[TableSchema]: ...
def commit_changes(self, changes: List[SchemaChange]) -> TableSchema: ...
def _generate_table_schema(self, old_table_schema: TableSchema,
changes: List[SchemaChange]) -> TableSchema: ...
def _list_versioned_files(self) -> List[int]: ...
def _to_schema_path(self, schema_id: int) -> str: ...
@staticmethod
def _apply_not_nested_column_rename(columns: List[str],
rename_mappings: dict) -> List[str]: ...
@staticmethod
def _apply_rename_columns_to_options(options: dict,
rename_mappings: dict) -> dict: ...
Import
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.common.file_io import LocalFileIO
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| file_io | FileIO | yes | File I/O abstraction |
| table_path | str | yes | Path to table directory |
| schema | Schema | yes (for create) | Initial schema definition |
| changes | List[SchemaChange] | yes (for evolution) | Schema changes to apply |
Outputs
| Name | Type | Description |
|---|---|---|
| TableSchema | TableSchema | Current or newly created schema |
| success | bool | Whether commit succeeded (for atomic operations) |
Usage Examples
Create Table
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.schema import Schema
from pypaimon.schema.data_types import DataField, AtomicType
from pypaimon.common.file_io import LocalFileIO
# Initialize manager
file_io = LocalFileIO()
manager = SchemaManager(file_io, "/path/to/table")
# Define schema
schema = Schema(
fields=[
DataField(0, "user_id", AtomicType("INT")),
DataField(1, "name", AtomicType("STRING")),
DataField(2, "email", AtomicType("STRING"))
],
partition_keys=["user_id"],
primary_keys=["user_id"],
options={"bucket": "4"}
)
# Create table
table_schema = manager.create_table(schema)
print(f"Created schema version {table_schema.id}")
Get Latest Schema
# Get latest schema
latest = manager.latest()
if latest:
print(f"Latest schema version: {latest.id}")
print(f"Fields: {[f.name for f in latest.fields]}")
else:
print("No schema found")
# Get specific version
schema_v0 = manager.get_schema(0)
schema_v1 = manager.get_schema(1)
Add Column
from pypaimon.schema.schema_change import AddColumn
# Add new column
changes = [
AddColumn(
field_names=["age"],
data_type=AtomicType("INT", nullable=True),
comment="User age"
)
]
new_schema = manager.commit_changes(changes)
print(f"Schema updated to version {new_schema.id}")
Rename Column
from pypaimon.schema.schema_change import RenameColumn
# Rename column
changes = [
RenameColumn(field_names=["email"], new_name="email_address")
]
new_schema = manager.commit_changes(changes)
Drop Column
from pypaimon.schema.schema_change import DropColumn
# Drop column (cannot drop partition or primary key)
changes = [
DropColumn(field_names=["age"])
]
new_schema = manager.commit_changes(changes)
Update Column Type
from pypaimon.schema.schema_change import UpdateColumnType
# Update column type
changes = [
UpdateColumnType(
field_names=["name"],
new_data_type=AtomicType("VARCHAR(100)"),
keep_nullability=True
)
]
new_schema = manager.commit_changes(changes)
Multiple Changes
from pypaimon.schema.schema_change import (
AddColumn, UpdateColumnComment, SetOption
)
# Apply multiple changes atomically
changes = [
AddColumn(field_names=["status"], data_type=AtomicType("STRING"),
comment="User status"),
UpdateColumnComment(field_names=["name"], new_comment="Full name"),
SetOption(key="compaction.max-file-num", value="50")
]
new_schema = manager.commit_changes(changes)
print(f"Applied {len(changes)} changes, new version: {new_schema.id}")