Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Paimon SnapshotCommit

From Leeroopedia


Knowledge Sources
Domains Snapshot Management, Transaction Management
Last Updated 2026-02-08 00:00 GMT

Overview

SnapshotCommit is an abstract base class defining the interface for atomically committing snapshots with partition statistics.

Description

The SnapshotCommit class provides the abstract interface for snapshot commit operations in Apache Paimon tables. It defines the contract that all snapshot commit implementations must follow, ensuring consistent behavior across different commit strategies (catalog-based, file-based, etc.).

The interface includes a commit method that takes a snapshot, branch name, and partition statistics, returning a boolean indicating success. It also defines a close method for resource cleanup and implements context manager protocol for safe resource management.

The module also includes the PartitionStatistics dataclass, which encapsulates partition-level metadata including record counts, file sizes, file counts, creation timestamps, and bucket information. This class is designed to match the Java PartitionStatistics structure for proper JSON serialization in REST API calls.

Usage

Use SnapshotCommit as the base interface when implementing custom snapshot commit strategies, or use PartitionStatistics when collecting partition metadata for snapshot commits or catalog operations.

Code Reference

Source Location

Signature

@dataclass
class PartitionStatistics:
    """Represents partition statistics for snapshot commits."""

    spec: Dict[str, str]
    record_count: int
    file_size_in_bytes: int
    file_count: int
    last_file_creation_time: int
    total_buckets: int

    @classmethod
    def create(
        cls,
        partition_spec: Dict[str, str] = None,
        record_count: int = 0,
        file_count: int = 0,
        file_size_in_bytes: int = 0,
        last_file_creation_time: int = None,
        total_buckets: int = 0
    ) -> 'PartitionStatistics':
        """Factory method to create PartitionStatistics with backward compatibility."""


class SnapshotCommit(ABC):
    """Interface to commit snapshot atomically."""

    @abstractmethod
    def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool:
        """Commit the given snapshot."""

    @abstractmethod
    def close(self):
        """Close the snapshot commit and release any resources."""

    def __enter__(self):
        """Context manager entry."""

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""

Import

from pypaimon.snapshot.snapshot_commit import SnapshotCommit, PartitionStatistics

I/O Contract

Inputs

Name Type Required Description
snapshot Snapshot Yes Snapshot to commit
branch str Yes Branch name for the commit
statistics List[PartitionStatistics] Yes Partition statistics for catalog metadata
partition_spec Dict[str, str] No Partition specification for statistics
record_count int No Number of records in partition
file_count int No Number of files in partition
file_size_in_bytes int No Total file size in bytes
total_buckets int No Total number of buckets

Outputs

Name Type Description
success bool True if commit was successful, False otherwise
partition_stats PartitionStatistics Partition statistics object

Usage Examples

from pypaimon.snapshot.snapshot_commit import SnapshotCommit, PartitionStatistics

# Create partition statistics
stats = PartitionStatistics.create(
    partition_spec={"date": "2024-01-01", "region": "us-west"},
    record_count=10000,
    file_count=5,
    file_size_in_bytes=104857600,
    total_buckets=4
)

# Use with a concrete implementation
from pypaimon.snapshot.renaming_snapshot_commit import RenamingSnapshotCommit

with RenamingSnapshotCommit(snapshot_manager) as committer:
    success = committer.commit(
        snapshot=new_snapshot,
        branch="main",
        statistics=[stats]
    )

    if success:
        print("Snapshot committed successfully")

# Multiple partitions
partition_stats = [
    PartitionStatistics.create(
        partition_spec={"date": "2024-01-01"},
        record_count=5000,
        file_count=2,
        file_size_in_bytes=52428800
    ),
    PartitionStatistics.create(
        partition_spec={"date": "2024-01-02"},
        record_count=7000,
        file_count=3,
        file_size_in_bytes=73400320
    )
]

# Implement custom snapshot commit
class CustomSnapshotCommit(SnapshotCommit):
    def commit(self, snapshot, branch, statistics):
        # Custom commit logic
        return True

    def close(self):
        # Cleanup logic
        pass

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment