Implementation:Run llama Llama index IngestionPipeline Run
| Knowledge Sources | |
|---|---|
| Domains | Data_Ingestion, RAG, Pipeline_Architecture |
| Last Updated | 2026-02-11 00:00 GMT |
Overview
The IngestionPipeline.run() method executes the configured transformation pipeline on a set of documents or nodes, returning the processed nodes with support for caching, deduplication, progress tracking, and parallel workers.
Description
run() is the primary execution method of IngestionPipeline. It accepts documents or pre-parsed nodes, applies docstore-based deduplication (if configured), runs each transformation in sequence with cache-aware execution, optionally inserts results into a vector store, and returns the final processed nodes.
The arun() method provides an identical async interface for use in asynchronous contexts.
Usage
Call run() on a configured IngestionPipeline instance. Pass documents via the documents parameter, or pass pre-parsed nodes via nodes to skip the document-to-node conversion step.
Code Reference
Source Location
- Repository: llama_index
- File: llama-index-core/llama_index/core/ingestion/pipeline.py
- Lines: L479-790
Signature
def run(
self,
show_progress: bool = False,
documents: Optional[List[Document]] = None,
nodes: Optional[List[BaseNode]] = None,
cache_collection: Optional[str] = None,
in_place: bool = True,
store_doc_text: bool = True,
num_workers: Optional[int] = None,
**kwargs: Any,
) -> Sequence[BaseNode]:
Async Variant
async def arun(
self,
show_progress: bool = False,
documents: Optional[List[Document]] = None,
nodes: Optional[List[BaseNode]] = None,
cache_collection: Optional[str] = None,
in_place: bool = True,
store_doc_text: bool = True,
num_workers: Optional[int] = None,
**kwargs: Any,
) -> Sequence[BaseNode]:
Import
from llama_index.core.ingestion import IngestionPipeline
# Usage (method call on instance)
nodes = pipeline.run(documents=documents)
# Async usage
nodes = await pipeline.arun(documents=documents)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| show_progress | bool | No (default: False) | Display a progress bar during transformation execution |
| documents | Optional[List[Document]] | No | Input documents to process; converted to nodes before transformation |
| nodes | Optional[List[BaseNode]] | No | Pre-parsed nodes to process; skips document-to-node conversion |
| cache_collection | Optional[str] | No | Named cache collection for storing intermediate results |
| in_place | bool | No (default: True) | If True, transform nodes in place; if False, deep copy before transforming |
| store_doc_text | bool | No (default: True) | Whether to store original document text in the docstore |
| num_workers | Optional[int] | No | Number of parallel workers for transformations that support parallelism |
Outputs
| Name | Type | Description |
|---|---|---|
| return | Sequence[BaseNode] | Processed nodes after all transformations have been applied |
Usage Examples
Basic Pipeline Execution
from llama_index.core import Document
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
pipeline = IngestionPipeline(
transformations=[SentenceSplitter(chunk_size=1024)],
)
documents = [Document(text="Long document content here...")]
nodes = pipeline.run(documents=documents, show_progress=True)
Async Execution with Parallel Workers
import asyncio
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=512),
OpenAIEmbedding(),
],
)
async def ingest(docs):
nodes = await pipeline.arun(
documents=docs,
num_workers=4,
show_progress=True,
)
return nodes
nodes = asyncio.run(ingest(documents))
Running with Pre-Parsed Nodes
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.openai import OpenAIEmbedding
# Skip splitting, only add embeddings to existing nodes
pipeline = IngestionPipeline(
transformations=[OpenAIEmbedding()],
)
# Pass pre-parsed nodes directly
embedded_nodes = pipeline.run(nodes=existing_nodes, in_place=False)