Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Run llama Llama index IngestionPipeline Run

From Leeroopedia
Knowledge Sources
Domains Data_Ingestion, RAG, Pipeline_Architecture
Last Updated 2026-02-11 00:00 GMT

Overview

The IngestionPipeline.run() method executes the configured transformation pipeline on a set of documents or nodes, returning the processed nodes with support for caching, deduplication, progress tracking, and parallel workers.

Description

run() is the primary execution method of IngestionPipeline. It accepts documents or pre-parsed nodes, applies docstore-based deduplication (if configured), runs each transformation in sequence with cache-aware execution, optionally inserts results into a vector store, and returns the final processed nodes.

The arun() method provides an identical async interface for use in asynchronous contexts.

Usage

Call run() on a configured IngestionPipeline instance. Pass documents via the documents parameter, or pass pre-parsed nodes via nodes to skip the document-to-node conversion step.

Code Reference

Source Location

  • Repository: llama_index
  • File: llama-index-core/llama_index/core/ingestion/pipeline.py
  • Lines: L479-790

Signature

def run(
    self,
    show_progress: bool = False,
    documents: Optional[List[Document]] = None,
    nodes: Optional[List[BaseNode]] = None,
    cache_collection: Optional[str] = None,
    in_place: bool = True,
    store_doc_text: bool = True,
    num_workers: Optional[int] = None,
    **kwargs: Any,
) -> Sequence[BaseNode]:

Async Variant

async def arun(
    self,
    show_progress: bool = False,
    documents: Optional[List[Document]] = None,
    nodes: Optional[List[BaseNode]] = None,
    cache_collection: Optional[str] = None,
    in_place: bool = True,
    store_doc_text: bool = True,
    num_workers: Optional[int] = None,
    **kwargs: Any,
) -> Sequence[BaseNode]:

Import

from llama_index.core.ingestion import IngestionPipeline

# Usage (method call on instance)
nodes = pipeline.run(documents=documents)

# Async usage
nodes = await pipeline.arun(documents=documents)

I/O Contract

Inputs

Name Type Required Description
show_progress bool No (default: False) Display a progress bar during transformation execution
documents Optional[List[Document]] No Input documents to process; converted to nodes before transformation
nodes Optional[List[BaseNode]] No Pre-parsed nodes to process; skips document-to-node conversion
cache_collection Optional[str] No Named cache collection for storing intermediate results
in_place bool No (default: True) If True, transform nodes in place; if False, deep copy before transforming
store_doc_text bool No (default: True) Whether to store original document text in the docstore
num_workers Optional[int] No Number of parallel workers for transformations that support parallelism

Outputs

Name Type Description
return Sequence[BaseNode] Processed nodes after all transformations have been applied

Usage Examples

Basic Pipeline Execution

from llama_index.core import Document
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter

pipeline = IngestionPipeline(
    transformations=[SentenceSplitter(chunk_size=1024)],
)

documents = [Document(text="Long document content here...")]
nodes = pipeline.run(documents=documents, show_progress=True)

Async Execution with Parallel Workers

import asyncio
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=512),
        OpenAIEmbedding(),
    ],
)

async def ingest(docs):
    nodes = await pipeline.arun(
        documents=docs,
        num_workers=4,
        show_progress=True,
    )
    return nodes

nodes = asyncio.run(ingest(documents))

Running with Pre-Parsed Nodes

from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.openai import OpenAIEmbedding

# Skip splitting, only add embeddings to existing nodes
pipeline = IngestionPipeline(
    transformations=[OpenAIEmbedding()],
)

# Pass pre-parsed nodes directly
embedded_nodes = pipeline.run(nodes=existing_nodes, in_place=False)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment