Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph AsyncPostgresStore

From Leeroopedia
Knowledge Sources
Domains Store, Postgres
Last Updated 2026-02-11 16:00 GMT

Overview

`AsyncPostgresStore` is an asynchronous Postgres-backed key-value store with optional vector search using pgvector, TTL-based expiration, and connection pooling support.

Description

`AsyncPostgresStore` extends both `AsyncBatchedBaseStore` and `BasePostgresStore` to provide a fully asynchronous interface for storing, retrieving, and searching key-value data in PostgreSQL. It supports namespaced key-value storage where items are organized by namespace tuples and identified by string keys. The store can operate with a single `AsyncConnection`, an `AsyncPipeline` for batched operations, or an `AsyncConnectionPool` for concurrent access.

The store provides optional vector search capabilities through pgvector integration. When an `index` configuration is provided with embedding dimensions and an embedding model, the store automatically generates and stores vector embeddings for document fields, enabling semantic similarity search via the `asearch` method. The embedding process is fully asynchronous and supports configurable vector types (`vector`, `halfvec`), index types (`hnsw`, `ivfflat`), and distance metrics.

`AsyncPostgresStore` also supports TTL (time-to-live) for automatic item expiration. When a `ttl` configuration is provided, items can be assigned expiration timestamps. A background sweeper task, started via `start_ttl_sweeper()`, periodically removes expired items. The sweeper can be gracefully stopped with `stop_ttl_sweeper()`. Thread safety and cursor management are handled through async locks, with special handling for pooled connections where each cursor checkout gets its own lock.

Usage

Use `AsyncPostgresStore` when you need a persistent, production-grade key-value store for LangGraph agents running in async applications. It is suitable for storing user preferences, conversation memories, document embeddings, and any structured data that agents need to retrieve across sessions. Use the vector search feature when you need semantic retrieval of stored items.

Code Reference

Source Location

Signature

class AsyncPostgresStore(AsyncBatchedBaseStore, BasePostgresStore[_ainternal.Conn]):
    def __init__(
        self,
        conn: AsyncConnection | AsyncConnectionPool,
        *,
        pipe: AsyncPipeline | None = None,
        deserializer: Callable[[bytes | orjson.Fragment], dict[str, Any]] | None = None,
        index: PostgresIndexConfig | None = None,
        ttl: TTLConfig | None = None,
    ) -> None:

Import

from langgraph.store.postgres import AsyncPostgresStore

I/O Contract

Inputs

Name Type Required Description
conn AsyncConnectionPool Yes An async Postgres connection or connection pool.
pipe None No Optional async pipeline for batched DB operations. Cannot be used with a pool.
deserializer None No Optional custom deserializer for stored values.
index None No Optional vector search configuration with dims, embed model, and fields.
ttl None No Optional time-to-live configuration for automatic item expiration.

Outputs

Name Type Description
AsyncPostgresStore AsyncPostgresStore An instance of the async Postgres store, ready for use after calling `setup()`.

Key Methods

Method Description
setup() Creates store tables, applies migrations, and sets up vector indexes if configured.
abatch(ops) Executes a batch of store operations (Get, Put, Search, ListNamespaces) asynchronously.
from_conn_string(conn_string, ...) Async classmethod context manager that creates an instance from a connection string, with optional pool config.
sweep_ttl() Deletes expired store items based on TTL. Returns the number of deleted items.
start_ttl_sweeper() Starts a background task that periodically removes expired items.
stop_ttl_sweeper(timeout) Gracefully stops the TTL sweeper task.

Usage Examples

from langgraph.store.postgres import AsyncPostgresStore

conn_string = "postgresql://user:pass@localhost:5432/dbname"

# Basic key-value storage
async with AsyncPostgresStore.from_conn_string(conn_string) as store:
    await store.setup()

    await store.aput(("users", "123"), "prefs", {"theme": "dark"})
    item = await store.aget(("users", "123"), "prefs")

# With vector search
from langchain.embeddings import init_embeddings

async with AsyncPostgresStore.from_conn_string(
    conn_string,
    index={
        "dims": 1536,
        "embed": init_embeddings("openai:text-embedding-3-small"),
        "fields": ["text"],
    },
) as store:
    await store.setup()

    await store.aput(("docs",), "doc1", {"text": "Python tutorial"})
    await store.aput(("docs",), "doc2", {"text": "TypeScript guide"})
    results = await store.asearch(("docs",), query="programming guides", limit=2)

# With connection pooling
from langgraph.store.postgres import PoolConfig

async with AsyncPostgresStore.from_conn_string(
    conn_string,
    pool_config=PoolConfig(min_size=5, max_size=20),
) as store:
    await store.setup()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment