Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Infiniflow Ragflow DocumentService Run

From Leeroopedia
Knowledge Sources
Domains RAG, Task_Queue
Last Updated 2026-02-12 06:00 GMT

Overview

Concrete tool for triggering document processing by creating tasks and enqueuing to Redis provided by RAGFlow DocumentService and TaskService.

Description

DocumentService.run prepares a document for processing by determining its status and delegating to queue_tasks which creates Task records, computes content digests, handles chunk reuse from previous runs, and enqueues unfinished tasks to Redis. For PDFs, pages are split into ranges based on task_page_size. For spreadsheets, rows are split into ranges of 3000.

Usage

Called from the POST /v1/document/run REST endpoint. Pass document IDs to trigger processing.

Code Reference

Source Location

  • Repository: ragflow
  • File: api/db/services/document_service.py (run: L911-931), api/db/services/task_service.py (queue_tasks: L360-464)

Signature

class DocumentService(CommonService):
    @classmethod
    @DB.connection_context()
    def run(cls, tenant_id: str, doc: dict, kb_table_num_map: dict) -> None:
        """Initiate document processing.

        Args:
            tenant_id: str - Tenant ID
            doc: dict - Document record with id, kb_id, parser_id, type, etc.
            kb_table_num_map: dict - Mapping of KB IDs to table counts
        """

def queue_tasks(doc: dict, bucket: str, name: str, priority: int = 0) -> None:
    """Create task records and enqueue to Redis.

    Args:
        doc: dict - Document with id, kb_id, parser_id, type, parser_config
        bucket: str - Storage bucket name
        name: str - Storage path
        priority: int - Queue priority (0=default)
    """

Import

from api.db.services.document_service import DocumentService
from api.db.services.task_service import queue_tasks

I/O Contract

Inputs

Name Type Required Description
doc_ids list[str] Yes Document IDs to process (from REST endpoint)
run str Yes Run status indicator

Outputs

Name Type Description
(none) None Side effects: Task records in DB, messages in Redis queue

Usage Examples

import requests

# Trigger document processing via REST API
url = "http://localhost:9380/v1/document/run"
payload = {
    "doc_ids": ["doc-uuid-1", "doc-uuid-2"],
    "run": "1"
}
headers = {"Authorization": "Bearer <token>"}
response = requests.post(url, json=payload, headers=headers)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment