Implementation:Infiniflow Ragflow DocStoreConn Insert
| Knowledge Sources | |
|---|---|
| Domains | RAG, Information_Retrieval |
| Last Updated | 2026-02-12 06:00 GMT |
Overview
Wrapper tool for bulk-inserting chunks into pluggable document stores (Elasticsearch/Infinity/OpenSearch) via the DocStoreConnection abstraction.
Description
The DocStoreConnection.insert abstract method and its implementations (ESConnection, InfinityConnection, OSConnection, OBConnection) provide bulk insert/update operations for chunk documents. The insert_chunks function in task_executor.py wraps this with progress tracking, batch processing (DOC_BULK_SIZE=4), and automatic index creation.
Usage
Called at the end of document processing via the insert_chunks function in task_executor.py.
Code Reference
Source Location
- Repository: ragflow
- File: rag/svr/task_executor.py (insert_chunks: L871-946), common/doc_store/doc_store_base.py (abstract: L217-222)
Signature
class DocStoreConnection(ABC):
@abstractmethod
def insert(
self,
rows: list[dict],
index_name: str,
dataset_id: str = None
) -> list[str]:
"""Bulk insert/update chunks.
Args:
rows: list[dict] - Chunk documents to insert.
index_name: str - Target index (format: ragflow_{tenant_id}).
dataset_id: str - Knowledge base ID.
Returns:
list[str] - Error messages (empty on success).
"""
@abstractmethod
def create_idx(
self,
index_name: str,
dataset_id: str,
vector_size: int,
parser_id: str = None
):
"""Create index if not exists."""
async def insert_chunks(
task_id: str,
task_tenant_id: str,
task_dataset_id: str,
chunks: list[dict],
progress_callback: callable
) -> bool:
"""Insert chunks with progress tracking.
Args:
task_id: str - Task ID.
task_tenant_id: str - Tenant ID.
task_dataset_id: str - KB ID.
chunks: list[dict] - Processed chunks.
progress_callback: callable - Progress callback.
Returns:
bool - True if successful.
"""
Import
from common.doc_store.doc_store_base import DocStoreConnection
from common import settings
# settings.docStoreConn is the initialized connection
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| rows | list[dict] | Yes | Chunk documents with embeddings and tokenized text |
| index_name | str | Yes | Index name (ragflow_{tenant_id}) |
| dataset_id | str | No | Knowledge base ID |
Outputs
| Name | Type | Description |
|---|---|---|
| errors | list[str] | Error messages (empty list on success) |
Usage Examples
from common import settings
from rag.nlp.search import index_name
# Insert chunks into document store
errors = settings.docStoreConn.insert(
rows=processed_chunks,
index_name=index_name(tenant_id),
dataset_id=kb_id
)
if errors:
print(f"Insert errors: {errors}")