Implementation:Infiniflow Ragflow DocumentService Run
| Knowledge Sources | |
|---|---|
| Domains | RAG, Task_Queue |
| Last Updated | 2026-02-12 06:00 GMT |
Overview
Concrete tool for triggering document processing by creating tasks and enqueuing to Redis provided by RAGFlow DocumentService and TaskService.
Description
DocumentService.run prepares a document for processing by determining its status and delegating to queue_tasks which creates Task records, computes content digests, handles chunk reuse from previous runs, and enqueues unfinished tasks to Redis. For PDFs, pages are split into ranges based on task_page_size. For spreadsheets, rows are split into ranges of 3000.
Usage
Called from the POST /v1/document/run REST endpoint. Pass document IDs to trigger processing.
Code Reference
Source Location
- Repository: ragflow
- File: api/db/services/document_service.py (run: L911-931), api/db/services/task_service.py (queue_tasks: L360-464)
Signature
class DocumentService(CommonService):
@classmethod
@DB.connection_context()
def run(cls, tenant_id: str, doc: dict, kb_table_num_map: dict) -> None:
"""Initiate document processing.
Args:
tenant_id: str - Tenant ID
doc: dict - Document record with id, kb_id, parser_id, type, etc.
kb_table_num_map: dict - Mapping of KB IDs to table counts
"""
def queue_tasks(doc: dict, bucket: str, name: str, priority: int = 0) -> None:
"""Create task records and enqueue to Redis.
Args:
doc: dict - Document with id, kb_id, parser_id, type, parser_config
bucket: str - Storage bucket name
name: str - Storage path
priority: int - Queue priority (0=default)
"""
Import
from api.db.services.document_service import DocumentService
from api.db.services.task_service import queue_tasks
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| doc_ids | list[str] | Yes | Document IDs to process (from REST endpoint) |
| run | str | Yes | Run status indicator |
Outputs
| Name | Type | Description |
|---|---|---|
| (none) | None | Side effects: Task records in DB, messages in Redis queue |
Usage Examples
import requests
# Trigger document processing via REST API
url = "http://localhost:9380/v1/document/run"
payload = {
"doc_ids": ["doc-uuid-1", "doc-uuid-2"],
"run": "1"
}
headers = {"Authorization": "Bearer <token>"}
response = requests.post(url, json=payload, headers=headers)