Implementation:Infiniflow Ragflow Build Chunks
| Knowledge Sources | |
|---|---|
| Domains | RAG, Document_Processing |
| Last Updated | 2026-02-12 06:00 GMT |
Overview
Concrete tool for converting raw documents into text chunks provided by RAGFlow's task executor build_chunks function.
Description
The build_chunks function is the main document parsing orchestrator. It fetches the file binary from object storage, selects the appropriate parser from the FACTORY dictionary based on parser_id, invokes the parser's chunk() method, and handles RAPTOR summarization, GraphRAG processing, and auto-keyword extraction as post-processing steps. The function has an 80-minute timeout.
Usage
Called automatically by the task executor worker for each task consumed from Redis.
Code Reference
Source Location
- Repository: ragflow
- File: rag/svr/task_executor.py
- Lines: L244-515 (build_chunks), L84-101 (FACTORY mapping)
Signature
FACTORY = {
"general": naive,
ParserType.NAIVE.value: naive,
ParserType.PAPER.value: paper,
ParserType.BOOK.value: book,
ParserType.PRESENTATION.value: presentation,
ParserType.MANUAL.value: manual,
ParserType.LAWS.value: laws,
ParserType.QA.value: qa,
ParserType.TABLE.value: table,
ParserType.RESUME.value: resume,
ParserType.PICTURE.value: picture,
ParserType.ONE.value: one,
ParserType.AUDIO.value: audio,
ParserType.EMAIL.value: email,
ParserType.KG.value: naive,
ParserType.TAG.value: tag,
}
@timeout(60 * 80, 1)
async def build_chunks(task: dict, progress_callback: callable) -> list[dict]:
"""Parse a document into chunks.
Args:
task: dict - Hydrated task dict from TaskService.get_task (21 fields).
progress_callback: callable - Progress update callback.
Returns:
list[dict] - Chunks with content_with_weight, page_num_int, position_int, img_id, etc.
"""
Import
from rag.svr.task_executor import build_chunks, FACTORY
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| task | dict | Yes | Hydrated task dictionary (21 fields from get_task) |
| progress_callback | callable | Yes | Function to report progress (0.0-1.0) |
Outputs
| Name | Type | Description |
|---|---|---|
| chunks | list[dict] | Document chunks with content_with_weight, page_num_int, top_int, position_int, available_int, img_id, image (optional bytes) |
Usage Examples
# Internal usage within task_executor.py
task = TaskService.get_task(task_id)
chunks = await build_chunks(task, progress_callback=set_progress)
print(f"Parsed {len(chunks)} chunks from {task['name']}")