Implementation:Langchain ai Langgraph StoreBatchOperations
| Knowledge Sources | |
|---|---|
| Domains | Store, Batching |
| Last Updated | 2026-02-11 16:00 GMT |
Overview
`AsyncBatchedBaseStore` and related utilities provide efficient batching of store operations by accumulating individual requests in a background asyncio task and executing them as a single batch.
Description
The store batch operations module implements two key classes for optimizing store access patterns. `AsyncBatchedBaseStore` extends `BaseStore` and wraps all individual store operations (get, put, search, delete, list_namespaces) into futures that are enqueued and processed together in a background asyncio task. This background loop continually drains the queue, deduplicates operations, calls the underlying `abatch` method once, and distributes results back to the individual futures.
The deduplication logic in `_dedupe_ops` ensures that identical read operations (GetOp, SearchOp, ListNamespacesOp) are only executed once, with their results shared across all callers. For write operations (PutOp), later writes to the same namespace/key overwrite earlier ones, ensuring only the final value is persisted. This significantly reduces the number of operations sent to the backing store when many concurrent callers issue overlapping requests.
Synchronous access is also supported via the `_check_loop` decorator, which detects if the caller is on the same event loop as the store's background task and raises `asyncio.InvalidStateError` with a helpful message suggesting the async alternative. When called from a different thread, synchronous methods use `asyncio.run_coroutine_threadsafe` to schedule work on the store's event loop.
Usage
Use `AsyncBatchedBaseStore` as a base class when implementing a custom store backend that benefits from batched I/O. This is particularly valuable in high-concurrency scenarios where many graph nodes or tasks access the store simultaneously, as it reduces the total number of database round-trips by coalescing operations scheduled within the same event loop tick.
Code Reference
Source Location
- Repository: Langchain_ai_Langgraph
- File: libs/checkpoint/langgraph/store/base/batch.py
Signature
class AsyncBatchedBaseStore(BaseStore):
"""Efficiently batch operations in a background task."""
__slots__ = ("_loop", "_aqueue", "_task")
def __init__(self) -> None: ...
async def aget(
self,
namespace: tuple[str, ...],
key: str,
*,
refresh_ttl: bool | None = None,
) -> Item | None: ...
async def asearch(
self,
namespace_prefix: tuple[str, ...],
/,
*,
query: str | None = None,
filter: dict[str, Any] | None = None,
limit: int = 10,
offset: int = 0,
refresh_ttl: bool | None = None,
) -> list[SearchItem]: ...
async def aput(
self,
namespace: tuple[str, ...],
key: str,
value: dict[str, Any],
index: Literal[False] | list[str] | None = None,
*,
ttl: float | None | NotProvided = NOT_PROVIDED,
) -> None: ...
async def adelete(
self,
namespace: tuple[str, ...],
key: str,
) -> None: ...
async def alist_namespaces(
self,
*,
prefix: NamespacePath | None = None,
suffix: NamespacePath | None = None,
max_depth: int | None = None,
limit: int = 100,
offset: int = 0,
) -> list[tuple[str, ...]]: ...
def batch(self, ops: Iterable[Op]) -> list[Result]: ...
def get(self, namespace, key, *, refresh_ttl=None) -> Item | None: ...
def search(self, namespace_prefix, /, *, query=None, filter=None, limit=10, offset=0, refresh_ttl=None) -> list[SearchItem]: ...
def put(self, namespace, key, value, index=None, *, ttl=NOT_PROVIDED) -> None: ...
def delete(self, namespace, key) -> None: ...
def list_namespaces(self, *, prefix=None, suffix=None, max_depth=None, limit=100, offset=0) -> list[tuple[str, ...]]: ...
Import
from langgraph.store.base.batch import AsyncBatchedBaseStore
I/O Contract
| Method | Input | Output | Description |
|---|---|---|---|
| `aget` | `namespace: tuple[str, ...]`, `key: str` | None` | Retrieve a single item by namespace and key |
| `asearch` | `namespace_prefix: tuple[str, ...]`, optional `query`, `filter`, `limit`, `offset` | `list[SearchItem]` | Search for items matching criteria within a namespace prefix |
| `aput` | `namespace: tuple[str, ...]`, `key: str`, `value: dict[str, Any]` | `None` | Store or update an item; value of `None` deletes |
| `adelete` | `namespace: tuple[str, ...]`, `key: str` | `None` | Delete an item by namespace and key |
| `alist_namespaces` | optional `prefix`, `suffix`, `max_depth`, `limit`, `offset` | `list[tuple[str, ...]]` | List namespaces matching prefix/suffix conditions |
| `batch` | `ops: Iterable[Op]` | `list[Result]` | Synchronous batch execution of multiple operations |
| Internal Function | Input | Output | Description |
|---|---|---|---|
| `_dedupe_ops` | `values: list[Op]` | None, list[Op]]` | Deduplicates operations; returns index mapping and deduped list |
| `_run` | `aqueue`, `store (weakref)` | `None` | Background loop that drains the queue, batches ops, and resolves futures |
Usage Examples
from langgraph.store.base.batch import AsyncBatchedBaseStore
from langgraph.store.base import Op, Result
class MyCustomStore(AsyncBatchedBaseStore):
"""Custom store with batched operations."""
async def abatch(self, ops: list[Op]) -> list[Result]:
# Implement actual storage backend logic here
results = []
for op in ops:
# Process each operation against your backend
results.append(None)
return results
# Usage within an async context
import asyncio
async def main():
store = MyCustomStore()
# These concurrent calls will be automatically batched
results = await asyncio.gather(
store.aget(("user", "123"), "profile"),
store.aget(("user", "456"), "profile"),
store.asearch(("user",), query="active"),
)
# Store an item
await store.aput(
("user", "123"),
"profile",
{"name": "Alice", "status": "active"},
)
# List namespaces
namespaces = await store.alist_namespaces(prefix=("user",))