Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Infiniflow Ragflow DocStoreConn Insert

From Leeroopedia
Knowledge Sources
Domains RAG, Information_Retrieval
Last Updated 2026-02-12 06:00 GMT

Overview

Wrapper tool for bulk-inserting chunks into pluggable document stores (Elasticsearch/Infinity/OpenSearch) via the DocStoreConnection abstraction.

Description

The DocStoreConnection.insert abstract method and its implementations (ESConnection, InfinityConnection, OSConnection, OBConnection) provide bulk insert/update operations for chunk documents. The insert_chunks function in task_executor.py wraps this with progress tracking, batch processing (DOC_BULK_SIZE=4), and automatic index creation.

Usage

Called at the end of document processing via the insert_chunks function in task_executor.py.

Code Reference

Source Location

  • Repository: ragflow
  • File: rag/svr/task_executor.py (insert_chunks: L871-946), common/doc_store/doc_store_base.py (abstract: L217-222)

Signature

class DocStoreConnection(ABC):
    @abstractmethod
    def insert(
        self,
        rows: list[dict],
        index_name: str,
        dataset_id: str = None
    ) -> list[str]:
        """Bulk insert/update chunks.

        Args:
            rows: list[dict] - Chunk documents to insert.
            index_name: str - Target index (format: ragflow_{tenant_id}).
            dataset_id: str - Knowledge base ID.
        Returns:
            list[str] - Error messages (empty on success).
        """

    @abstractmethod
    def create_idx(
        self,
        index_name: str,
        dataset_id: str,
        vector_size: int,
        parser_id: str = None
    ):
        """Create index if not exists."""

async def insert_chunks(
    task_id: str,
    task_tenant_id: str,
    task_dataset_id: str,
    chunks: list[dict],
    progress_callback: callable
) -> bool:
    """Insert chunks with progress tracking.

    Args:
        task_id: str - Task ID.
        task_tenant_id: str - Tenant ID.
        task_dataset_id: str - KB ID.
        chunks: list[dict] - Processed chunks.
        progress_callback: callable - Progress callback.
    Returns:
        bool - True if successful.
    """

Import

from common.doc_store.doc_store_base import DocStoreConnection
from common import settings
# settings.docStoreConn is the initialized connection

I/O Contract

Inputs

Name Type Required Description
rows list[dict] Yes Chunk documents with embeddings and tokenized text
index_name str Yes Index name (ragflow_{tenant_id})
dataset_id str No Knowledge base ID

Outputs

Name Type Description
errors list[str] Error messages (empty list on success)

Usage Examples

from common import settings
from rag.nlp.search import index_name

# Insert chunks into document store
errors = settings.docStoreConn.insert(
    rows=processed_chunks,
    index_name=index_name(tenant_id),
    dataset_id=kb_id
)
if errors:
    print(f"Insert errors: {errors}")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment