Implementation:Langchain ai Langgraph AsyncPostgresStore
| Knowledge Sources | |
|---|---|
| Domains | Store, Postgres |
| Last Updated | 2026-02-11 16:00 GMT |
Overview
`AsyncPostgresStore` is an asynchronous Postgres-backed key-value store with optional vector search using pgvector, TTL-based expiration, and connection pooling support.
Description
`AsyncPostgresStore` extends both `AsyncBatchedBaseStore` and `BasePostgresStore` to provide a fully asynchronous interface for storing, retrieving, and searching key-value data in PostgreSQL. It supports namespaced key-value storage where items are organized by namespace tuples and identified by string keys. The store can operate with a single `AsyncConnection`, an `AsyncPipeline` for batched operations, or an `AsyncConnectionPool` for concurrent access.
The store provides optional vector search capabilities through pgvector integration. When an `index` configuration is provided with embedding dimensions and an embedding model, the store automatically generates and stores vector embeddings for document fields, enabling semantic similarity search via the `asearch` method. The embedding process is fully asynchronous and supports configurable vector types (`vector`, `halfvec`), index types (`hnsw`, `ivfflat`), and distance metrics.
`AsyncPostgresStore` also supports TTL (time-to-live) for automatic item expiration. When a `ttl` configuration is provided, items can be assigned expiration timestamps. A background sweeper task, started via `start_ttl_sweeper()`, periodically removes expired items. The sweeper can be gracefully stopped with `stop_ttl_sweeper()`. Thread safety and cursor management are handled through async locks, with special handling for pooled connections where each cursor checkout gets its own lock.
Usage
Use `AsyncPostgresStore` when you need a persistent, production-grade key-value store for LangGraph agents running in async applications. It is suitable for storing user preferences, conversation memories, document embeddings, and any structured data that agents need to retrieve across sessions. Use the vector search feature when you need semantic retrieval of stored items.
Code Reference
Source Location
- Repository: Langchain_ai_Langgraph
- File: libs/checkpoint-postgres/langgraph/store/postgres/aio.py
- Lines: 1-591
Signature
class AsyncPostgresStore(AsyncBatchedBaseStore, BasePostgresStore[_ainternal.Conn]):
def __init__(
self,
conn: AsyncConnection | AsyncConnectionPool,
*,
pipe: AsyncPipeline | None = None,
deserializer: Callable[[bytes | orjson.Fragment], dict[str, Any]] | None = None,
index: PostgresIndexConfig | None = None,
ttl: TTLConfig | None = None,
) -> None:
Import
from langgraph.store.postgres import AsyncPostgresStore
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| conn | AsyncConnectionPool | Yes | An async Postgres connection or connection pool. |
| pipe | None | No | Optional async pipeline for batched DB operations. Cannot be used with a pool. |
| deserializer | None | No | Optional custom deserializer for stored values. |
| index | None | No | Optional vector search configuration with dims, embed model, and fields. |
| ttl | None | No | Optional time-to-live configuration for automatic item expiration. |
Outputs
| Name | Type | Description |
|---|---|---|
| AsyncPostgresStore | AsyncPostgresStore |
An instance of the async Postgres store, ready for use after calling `setup()`. |
Key Methods
| Method | Description |
|---|---|
setup() |
Creates store tables, applies migrations, and sets up vector indexes if configured. |
abatch(ops) |
Executes a batch of store operations (Get, Put, Search, ListNamespaces) asynchronously. |
from_conn_string(conn_string, ...) |
Async classmethod context manager that creates an instance from a connection string, with optional pool config. |
sweep_ttl() |
Deletes expired store items based on TTL. Returns the number of deleted items. |
start_ttl_sweeper() |
Starts a background task that periodically removes expired items. |
stop_ttl_sweeper(timeout) |
Gracefully stops the TTL sweeper task. |
Usage Examples
from langgraph.store.postgres import AsyncPostgresStore
conn_string = "postgresql://user:pass@localhost:5432/dbname"
# Basic key-value storage
async with AsyncPostgresStore.from_conn_string(conn_string) as store:
await store.setup()
await store.aput(("users", "123"), "prefs", {"theme": "dark"})
item = await store.aget(("users", "123"), "prefs")
# With vector search
from langchain.embeddings import init_embeddings
async with AsyncPostgresStore.from_conn_string(
conn_string,
index={
"dims": 1536,
"embed": init_embeddings("openai:text-embedding-3-small"),
"fields": ["text"],
},
) as store:
await store.setup()
await store.aput(("docs",), "doc1", {"text": "Python tutorial"})
await store.aput(("docs",), "doc2", {"text": "TypeScript guide"})
results = await store.asearch(("docs",), query="programming guides", limit=2)
# With connection pooling
from langgraph.store.postgres import PoolConfig
async with AsyncPostgresStore.from_conn_string(
conn_string,
pool_config=PoolConfig(min_size=5, max_size=20),
) as store:
await store.setup()