Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langchain ai Langgraph BaseCheckpointSaver Protocol

From Leeroopedia
Attribute Value
Page Type Implementation (API Doc)
Library langgraph (checkpoint)
Workflow Persistence_and_Memory_Setup
Principle Langchain_ai_Langgraph_Checkpoint_Backend_Selection
Implementation BaseCheckpointSaver_Protocol
Source libs/checkpoint/langgraph/checkpoint/base/__init__.py:L116-396, libs/checkpoint/langgraph/checkpoint/memory/__init__.py:L31-530

Overview

BaseCheckpointSaver is the abstract base class that defines the checkpoint saver protocol for LangGraph. All checkpoint backends (in-memory, SQLite, PostgreSQL, or custom) implement this interface. InMemorySaver is the concrete in-memory implementation suitable for testing and development.

Description

BaseCheckpointSaver is a generic class parameterized by the version type V (which can be int, float, or str). It defines the contract that every checkpointer must fulfill: storing and retrieving checkpoints, listing checkpoint history, and managing intermediate writes. The class uses JsonPlusSerializer as its default serialization protocol.

InMemorySaver extends BaseCheckpointSaver[str] and stores all data in Python defaultdict structures. It supports both synchronous and asynchronous context manager protocols.

Usage

Import

from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.checkpoint.memory import InMemorySaver

Code Reference

Source Location

Class File Lines
BaseCheckpointSaver libs/checkpoint/langgraph/checkpoint/base/__init__.py L116-396
InMemorySaver libs/checkpoint/langgraph/checkpoint/memory/__init__.py L31-530

BaseCheckpointSaver Signatures

class BaseCheckpointSaver(Generic[V]):
    serde: SerializerProtocol = JsonPlusSerializer()

    def __init__(self, *, serde: SerializerProtocol | None = None) -> None: ...

    def get(self, config: RunnableConfig) -> Checkpoint | None: ...

    def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None: ...

    def list(
        self,
        config: RunnableConfig | None,
        *,
        filter: dict[str, Any] | None = None,
        before: RunnableConfig | None = None,
        limit: int | None = None,
    ) -> Iterator[CheckpointTuple]: ...

    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig: ...

    def put_writes(
        self,
        config: RunnableConfig,
        writes: Sequence[tuple[str, Any]],
        task_id: str,
        task_path: str = "",
    ) -> None: ...

    def delete_thread(self, thread_id: str) -> None: ...

    def get_next_version(self, current: V | None, channel: None) -> V: ...

Async variants: aget, aget_tuple, alist, aput, aput_writes, adelete_thread.

InMemorySaver Signature

class InMemorySaver(BaseCheckpointSaver[str], AbstractContextManager, AbstractAsyncContextManager):
    def __init__(
        self,
        *,
        serde: SerializerProtocol | None = None,
        factory: type[defaultdict] = defaultdict,
    ) -> None: ...

Supporting Types

class Checkpoint(TypedDict):
    v: int
    id: str
    ts: str
    channel_values: dict[str, Any]
    channel_versions: ChannelVersions
    versions_seen: dict[str, ChannelVersions]
    updated_channels: list[str] | None

class CheckpointMetadata(TypedDict, total=False):
    source: Literal["input", "loop", "update", "fork"]
    step: int
    parents: dict[str, str]

class CheckpointTuple(NamedTuple):
    config: RunnableConfig
    checkpoint: Checkpoint
    metadata: CheckpointMetadata
    parent_config: RunnableConfig | None = None
    pending_writes: list[PendingWrite] | None = None

I/O Contract

Method Input Output Description
get_tuple config: RunnableConfig None Retrieve a checkpoint by thread_id and optional checkpoint_id
list config, filter, before, limit Iterator[CheckpointTuple] List checkpoints matching criteria, ordered newest first
put config, checkpoint, metadata, new_versions RunnableConfig Store a checkpoint; returns updated config with checkpoint_id
put_writes config, writes, task_id, task_path None Store intermediate writes linked to a checkpoint
delete_thread thread_id: str None Delete all checkpoints and writes for a thread
get_next_version None, channel V Generate next monotonically increasing version ID

The config parameter must contain {"configurable": {"thread_id": "..."}} at minimum. Optionally, checkpoint_ns (namespace) and checkpoint_id may be provided to target a specific checkpoint.

Usage Examples

Using InMemorySaver (Testing / Prototyping)

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph

builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke(1, config)
# result: 2

Implementing a Custom CheckpointSaver

from langgraph.checkpoint.base import (
    BaseCheckpointSaver,
    Checkpoint,
    CheckpointMetadata,
    CheckpointTuple,
    ChannelVersions,
)
from langchain_core.runnables import RunnableConfig
from collections.abc import Iterator, Sequence
from typing import Any

class MyCustomSaver(BaseCheckpointSaver[int]):
    def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        # Implement retrieval logic
        ...

    def list(
        self,
        config: RunnableConfig | None,
        *,
        filter: dict[str, Any] | None = None,
        before: RunnableConfig | None = None,
        limit: int | None = None,
    ) -> Iterator[CheckpointTuple]:
        # Implement listing logic
        ...

    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        # Implement storage logic
        ...

    def put_writes(
        self,
        config: RunnableConfig,
        writes: Sequence[tuple[str, Any]],
        task_id: str,
        task_path: str = "",
    ) -> None:
        # Implement intermediate write storage
        ...

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment