Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph StoreBatchOperations

From Leeroopedia
Knowledge Sources
Domains Store, Batching
Last Updated 2026-02-11 16:00 GMT

Overview

`AsyncBatchedBaseStore` and related utilities provide efficient batching of store operations by accumulating individual requests in a background asyncio task and executing them as a single batch.

Description

The store batch operations module implements two key classes for optimizing store access patterns. `AsyncBatchedBaseStore` extends `BaseStore` and wraps all individual store operations (get, put, search, delete, list_namespaces) into futures that are enqueued and processed together in a background asyncio task. This background loop continually drains the queue, deduplicates operations, calls the underlying `abatch` method once, and distributes results back to the individual futures.

The deduplication logic in `_dedupe_ops` ensures that identical read operations (GetOp, SearchOp, ListNamespacesOp) are only executed once, with their results shared across all callers. For write operations (PutOp), later writes to the same namespace/key overwrite earlier ones, ensuring only the final value is persisted. This significantly reduces the number of operations sent to the backing store when many concurrent callers issue overlapping requests.

Synchronous access is also supported via the `_check_loop` decorator, which detects if the caller is on the same event loop as the store's background task and raises `asyncio.InvalidStateError` with a helpful message suggesting the async alternative. When called from a different thread, synchronous methods use `asyncio.run_coroutine_threadsafe` to schedule work on the store's event loop.

Usage

Use `AsyncBatchedBaseStore` as a base class when implementing a custom store backend that benefits from batched I/O. This is particularly valuable in high-concurrency scenarios where many graph nodes or tasks access the store simultaneously, as it reduces the total number of database round-trips by coalescing operations scheduled within the same event loop tick.

Code Reference

Source Location

Signature

class AsyncBatchedBaseStore(BaseStore):
    """Efficiently batch operations in a background task."""

    __slots__ = ("_loop", "_aqueue", "_task")

    def __init__(self) -> None: ...

    async def aget(
        self,
        namespace: tuple[str, ...],
        key: str,
        *,
        refresh_ttl: bool | None = None,
    ) -> Item | None: ...

    async def asearch(
        self,
        namespace_prefix: tuple[str, ...],
        /,
        *,
        query: str | None = None,
        filter: dict[str, Any] | None = None,
        limit: int = 10,
        offset: int = 0,
        refresh_ttl: bool | None = None,
    ) -> list[SearchItem]: ...

    async def aput(
        self,
        namespace: tuple[str, ...],
        key: str,
        value: dict[str, Any],
        index: Literal[False] | list[str] | None = None,
        *,
        ttl: float | None | NotProvided = NOT_PROVIDED,
    ) -> None: ...

    async def adelete(
        self,
        namespace: tuple[str, ...],
        key: str,
    ) -> None: ...

    async def alist_namespaces(
        self,
        *,
        prefix: NamespacePath | None = None,
        suffix: NamespacePath | None = None,
        max_depth: int | None = None,
        limit: int = 100,
        offset: int = 0,
    ) -> list[tuple[str, ...]]: ...

    def batch(self, ops: Iterable[Op]) -> list[Result]: ...
    def get(self, namespace, key, *, refresh_ttl=None) -> Item | None: ...
    def search(self, namespace_prefix, /, *, query=None, filter=None, limit=10, offset=0, refresh_ttl=None) -> list[SearchItem]: ...
    def put(self, namespace, key, value, index=None, *, ttl=NOT_PROVIDED) -> None: ...
    def delete(self, namespace, key) -> None: ...
    def list_namespaces(self, *, prefix=None, suffix=None, max_depth=None, limit=100, offset=0) -> list[tuple[str, ...]]: ...

Import

from langgraph.store.base.batch import AsyncBatchedBaseStore

I/O Contract

Method Input Output Description
`aget` `namespace: tuple[str, ...]`, `key: str` None` Retrieve a single item by namespace and key
`asearch` `namespace_prefix: tuple[str, ...]`, optional `query`, `filter`, `limit`, `offset` `list[SearchItem]` Search for items matching criteria within a namespace prefix
`aput` `namespace: tuple[str, ...]`, `key: str`, `value: dict[str, Any]` `None` Store or update an item; value of `None` deletes
`adelete` `namespace: tuple[str, ...]`, `key: str` `None` Delete an item by namespace and key
`alist_namespaces` optional `prefix`, `suffix`, `max_depth`, `limit`, `offset` `list[tuple[str, ...]]` List namespaces matching prefix/suffix conditions
`batch` `ops: Iterable[Op]` `list[Result]` Synchronous batch execution of multiple operations
Internal Function Input Output Description
`_dedupe_ops` `values: list[Op]` None, list[Op]]` Deduplicates operations; returns index mapping and deduped list
`_run` `aqueue`, `store (weakref)` `None` Background loop that drains the queue, batches ops, and resolves futures

Usage Examples

from langgraph.store.base.batch import AsyncBatchedBaseStore
from langgraph.store.base import Op, Result

class MyCustomStore(AsyncBatchedBaseStore):
    """Custom store with batched operations."""

    async def abatch(self, ops: list[Op]) -> list[Result]:
        # Implement actual storage backend logic here
        results = []
        for op in ops:
            # Process each operation against your backend
            results.append(None)
        return results

# Usage within an async context
import asyncio

async def main():
    store = MyCustomStore()

    # These concurrent calls will be automatically batched
    results = await asyncio.gather(
        store.aget(("user", "123"), "profile"),
        store.aget(("user", "456"), "profile"),
        store.asearch(("user",), query="active"),
    )

    # Store an item
    await store.aput(
        ("user", "123"),
        "profile",
        {"name": "Alice", "status": "active"},
    )

    # List namespaces
    namespaces = await store.alist_namespaces(prefix=("user",))

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment