Implementation:Apache Paimon SnapshotManager
| Knowledge Sources | |
|---|---|
| Domains | Snapshot Management, File I/O |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
SnapshotManager manages snapshot files using unified FileIO with support for reading, scanning, and time-based snapshot lookups.
Description
The SnapshotManager class provides comprehensive functionality for managing snapshot files in Apache Paimon tables. It handles reading snapshots by ID, finding the latest snapshot, scanning for earliest snapshots, and performing binary search for snapshots based on commit timestamps.
The manager maintains a snapshot directory structure with individual snapshot files (snapshot-N) and hint files (LATEST, EARLIEST) for fast access. It implements a retry mechanism for reading the LATEST file and falls back to directory scanning if the hint file is unavailable or inconsistent.
The class supports time-travel queries through the earlier_or_equal_time_mills method, which uses binary search to efficiently find the latest snapshot committed before or at a given timestamp. This enables point-in-time queries and historical data access.
Usage
Use SnapshotManager when implementing table readers, time-travel queries, snapshot-based operations, or any functionality that needs to access historical table states in Apache Paimon.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/snapshot/snapshot_manager.py
Signature
class SnapshotManager:
"""Manager for snapshot files using unified FileIO."""
def __init__(self, table):
"""Initialize with a FileStoreTable instance."""
def get_latest_snapshot(self) -> Optional[Snapshot]:
"""Get the latest snapshot."""
def get_latest_snapshot_json(self) -> Optional[str]:
"""Get the latest snapshot as JSON string."""
def read_latest_file(self, max_retries: int = 5):
"""Read the latest snapshot ID from LATEST file with retry mechanism."""
def get_snapshot_path(self, snapshot_id: int) -> str:
"""Get the path for a snapshot file."""
def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
"""Get the earliest snapshot."""
def earlier_or_equal_time_mills(self, timestamp: int) -> Optional[Snapshot]:
"""Find the latest snapshot with time_millis <= the given timestamp."""
def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
"""Get a snapshot by its ID."""
Import
from pypaimon.snapshot.snapshot_manager import SnapshotManager
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table | FileStoreTable | Yes | File store table instance |
| snapshot_id | int | Yes (for get_snapshot_by_id) | Snapshot ID to retrieve |
| timestamp | int | Yes (for time-based lookup) | Timestamp in milliseconds for time-travel query |
| max_retries | int | No | Maximum retries for reading LATEST file (default 5) |
Outputs
| Name | Type | Description |
|---|---|---|
| snapshot | Snapshot | Snapshot object with metadata |
| snapshot_json | str | Snapshot serialized as JSON |
| snapshot_path | str | File path to snapshot file |
Usage Examples
from pypaimon.snapshot.snapshot_manager import SnapshotManager
# Create snapshot manager
snapshot_manager = SnapshotManager(table)
# Get latest snapshot
latest = snapshot_manager.get_latest_snapshot()
if latest:
print(f"Latest snapshot ID: {latest.id}")
print(f"Commit time: {latest.time_millis}")
print(f"Base manifest: {latest.base_manifest_list}")
# Get snapshot by ID
snapshot_5 = snapshot_manager.get_snapshot_by_id(5)
# Time-travel query - get snapshot at specific time
import time
timestamp = int(time.time() * 1000) - (24 * 60 * 60 * 1000) # 1 day ago
historical_snapshot = snapshot_manager.earlier_or_equal_time_mills(timestamp)
if historical_snapshot:
print(f"Found snapshot {historical_snapshot.id} at timestamp {timestamp}")
# Get earliest snapshot
earliest = snapshot_manager.try_get_earliest_snapshot()
# Get snapshot path
path = snapshot_manager.get_snapshot_path(10)
print(f"Snapshot 10 path: {path}")
# Read latest snapshot as JSON
json_str = snapshot_manager.get_latest_snapshot_json()
if json_str:
import json
snapshot_data = json.loads(json_str)
print(f"Snapshot data: {snapshot_data}")