Implementation:Apache Paimon SnapshotCommit
| Knowledge Sources | |
|---|---|
| Domains | Snapshot Management, Transaction Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
SnapshotCommit is an abstract base class defining the interface for atomically committing snapshots with partition statistics.
Description
The SnapshotCommit class provides the abstract interface for snapshot commit operations in Apache Paimon tables. It defines the contract that all snapshot commit implementations must follow, ensuring consistent behavior across different commit strategies (catalog-based, file-based, etc.).
The interface includes a commit method that takes a snapshot, branch name, and partition statistics, returning a boolean indicating success. It also defines a close method for resource cleanup and implements context manager protocol for safe resource management.
The module also includes the PartitionStatistics dataclass, which encapsulates partition-level metadata including record counts, file sizes, file counts, creation timestamps, and bucket information. This class is designed to match the Java PartitionStatistics structure for proper JSON serialization in REST API calls.
Usage
Use SnapshotCommit as the base interface when implementing custom snapshot commit strategies, or use PartitionStatistics when collecting partition metadata for snapshot commits or catalog operations.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/snapshot/snapshot_commit.py
Signature
@dataclass
class PartitionStatistics:
"""Represents partition statistics for snapshot commits."""
spec: Dict[str, str]
record_count: int
file_size_in_bytes: int
file_count: int
last_file_creation_time: int
total_buckets: int
@classmethod
def create(
cls,
partition_spec: Dict[str, str] = None,
record_count: int = 0,
file_count: int = 0,
file_size_in_bytes: int = 0,
last_file_creation_time: int = None,
total_buckets: int = 0
) -> 'PartitionStatistics':
"""Factory method to create PartitionStatistics with backward compatibility."""
class SnapshotCommit(ABC):
"""Interface to commit snapshot atomically."""
@abstractmethod
def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool:
"""Commit the given snapshot."""
@abstractmethod
def close(self):
"""Close the snapshot commit and release any resources."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
Import
from pypaimon.snapshot.snapshot_commit import SnapshotCommit, PartitionStatistics
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| snapshot | Snapshot | Yes | Snapshot to commit |
| branch | str | Yes | Branch name for the commit |
| statistics | List[PartitionStatistics] | Yes | Partition statistics for catalog metadata |
| partition_spec | Dict[str, str] | No | Partition specification for statistics |
| record_count | int | No | Number of records in partition |
| file_count | int | No | Number of files in partition |
| file_size_in_bytes | int | No | Total file size in bytes |
| total_buckets | int | No | Total number of buckets |
Outputs
| Name | Type | Description |
|---|---|---|
| success | bool | True if commit was successful, False otherwise |
| partition_stats | PartitionStatistics | Partition statistics object |
Usage Examples
from pypaimon.snapshot.snapshot_commit import SnapshotCommit, PartitionStatistics
# Create partition statistics
stats = PartitionStatistics.create(
partition_spec={"date": "2024-01-01", "region": "us-west"},
record_count=10000,
file_count=5,
file_size_in_bytes=104857600,
total_buckets=4
)
# Use with a concrete implementation
from pypaimon.snapshot.renaming_snapshot_commit import RenamingSnapshotCommit
with RenamingSnapshotCommit(snapshot_manager) as committer:
success = committer.commit(
snapshot=new_snapshot,
branch="main",
statistics=[stats]
)
if success:
print("Snapshot committed successfully")
# Multiple partitions
partition_stats = [
PartitionStatistics.create(
partition_spec={"date": "2024-01-01"},
record_count=5000,
file_count=2,
file_size_in_bytes=52428800
),
PartitionStatistics.create(
partition_spec={"date": "2024-01-02"},
record_count=7000,
file_count=3,
file_size_in_bytes=73400320
)
]
# Implement custom snapshot commit
class CustomSnapshotCommit(SnapshotCommit):
def commit(self, snapshot, branch, statistics):
# Custom commit logic
return True
def close(self):
# Cleanup logic
pass