Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Paimon SnapshotManager

From Leeroopedia


Knowledge Sources
Domains Snapshot Management, File I/O
Last Updated 2026-02-08 00:00 GMT

Overview

SnapshotManager manages snapshot files using unified FileIO with support for reading, scanning, and time-based snapshot lookups.

Description

The SnapshotManager class provides comprehensive functionality for managing snapshot files in Apache Paimon tables. It handles reading snapshots by ID, finding the latest snapshot, scanning for earliest snapshots, and performing binary search for snapshots based on commit timestamps.

The manager maintains a snapshot directory structure with individual snapshot files (snapshot-N) and hint files (LATEST, EARLIEST) for fast access. It implements a retry mechanism for reading the LATEST file and falls back to directory scanning if the hint file is unavailable or inconsistent.

The class supports time-travel queries through the earlier_or_equal_time_mills method, which uses binary search to efficiently find the latest snapshot committed before or at a given timestamp. This enables point-in-time queries and historical data access.

Usage

Use SnapshotManager when implementing table readers, time-travel queries, snapshot-based operations, or any functionality that needs to access historical table states in Apache Paimon.

Code Reference

Source Location

Signature

class SnapshotManager:
    """Manager for snapshot files using unified FileIO."""

    def __init__(self, table):
        """Initialize with a FileStoreTable instance."""

    def get_latest_snapshot(self) -> Optional[Snapshot]:
        """Get the latest snapshot."""

    def get_latest_snapshot_json(self) -> Optional[str]:
        """Get the latest snapshot as JSON string."""

    def read_latest_file(self, max_retries: int = 5):
        """Read the latest snapshot ID from LATEST file with retry mechanism."""

    def get_snapshot_path(self, snapshot_id: int) -> str:
        """Get the path for a snapshot file."""

    def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
        """Get the earliest snapshot."""

    def earlier_or_equal_time_mills(self, timestamp: int) -> Optional[Snapshot]:
        """Find the latest snapshot with time_millis <= the given timestamp."""

    def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
        """Get a snapshot by its ID."""

Import

from pypaimon.snapshot.snapshot_manager import SnapshotManager

I/O Contract

Inputs

Name Type Required Description
table FileStoreTable Yes File store table instance
snapshot_id int Yes (for get_snapshot_by_id) Snapshot ID to retrieve
timestamp int Yes (for time-based lookup) Timestamp in milliseconds for time-travel query
max_retries int No Maximum retries for reading LATEST file (default 5)

Outputs

Name Type Description
snapshot Snapshot Snapshot object with metadata
snapshot_json str Snapshot serialized as JSON
snapshot_path str File path to snapshot file

Usage Examples

from pypaimon.snapshot.snapshot_manager import SnapshotManager

# Create snapshot manager
snapshot_manager = SnapshotManager(table)

# Get latest snapshot
latest = snapshot_manager.get_latest_snapshot()
if latest:
    print(f"Latest snapshot ID: {latest.id}")
    print(f"Commit time: {latest.time_millis}")
    print(f"Base manifest: {latest.base_manifest_list}")

# Get snapshot by ID
snapshot_5 = snapshot_manager.get_snapshot_by_id(5)

# Time-travel query - get snapshot at specific time
import time
timestamp = int(time.time() * 1000) - (24 * 60 * 60 * 1000)  # 1 day ago
historical_snapshot = snapshot_manager.earlier_or_equal_time_mills(timestamp)
if historical_snapshot:
    print(f"Found snapshot {historical_snapshot.id} at timestamp {timestamp}")

# Get earliest snapshot
earliest = snapshot_manager.try_get_earliest_snapshot()

# Get snapshot path
path = snapshot_manager.get_snapshot_path(10)
print(f"Snapshot 10 path: {path}")

# Read latest snapshot as JSON
json_str = snapshot_manager.get_latest_snapshot_json()
if json_str:
    import json
    snapshot_data = json.loads(json_str)
    print(f"Snapshot data: {snapshot_data}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment