Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Run llama Llama index Data Ingestion Pipeline

From Leeroopedia
Knowledge Sources
Domains Data_Engineering, LLMs, RAG
Last Updated 2026-02-11 19:00 GMT

Overview

End-to-end process for building a reusable data ingestion pipeline that transforms documents into embedded nodes with caching, deduplication, and optional parallel processing.

Description

This workflow uses the IngestionPipeline class to define a configurable chain of transformations that process raw documents into vector-embedded nodes ready for indexing. Unlike building a VectorStoreIndex directly, the ingestion pipeline provides explicit control over each transformation step, supports caching to avoid re-processing unchanged documents, implements deduplication strategies via a document store, and allows parallel processing with multiple workers. The pipeline can be persisted and reloaded for incremental updates.

Usage

Execute this workflow when you need fine-grained control over the data processing pipeline, particularly for production scenarios involving large or frequently updated document collections. Use this instead of the basic RAG pipeline when you need caching, deduplication, parallel processing, or custom transformation chains.

Execution Steps

Step 1: Define Transformations

Configure the ordered list of transformation components that will process documents into nodes. Each transformation receives a sequence of nodes and returns a modified sequence. Common transformations include text splitters (SentenceSplitter, TokenTextSplitter), metadata extractors, and embedding models.

Key considerations:

  • Transformations execute in order; each receives the output of the previous
  • The embedding model should typically be the last transformation
  • Custom transformations can be created by extending TransformComponent

Step 2: Configure Storage Backends

Set up optional vector store and document store backends. The vector store receives the final embedded nodes, while the document store enables deduplication by tracking document hashes. Choose a deduplication strategy: UPSERTS (update changed docs), DUPLICATES_ONLY (skip exact matches), or UPSERTS_AND_DELETE (also remove deleted docs).

Key considerations:

  • Without a docstore, no deduplication occurs
  • The docstore strategy controls how changed and removed documents are handled
  • Both in-memory and persistent store backends are supported

Step 3: Create Pipeline

Instantiate the IngestionPipeline with the configured transformations, vector store, document store, and caching settings. The pipeline automatically initializes an IngestionCache that tracks transformation results by hashing node content and transformation configuration.

Key considerations:

  • The cache can be disabled with disable_cache=True for debugging
  • Named cache collections allow separate caches for different pipeline configurations
  • The pipeline is serializable for persistence

Step 4: Run Pipeline

Execute the pipeline on a batch of documents or nodes. The pipeline applies each transformation sequentially (with optional parallel batch processing), skipping cached results. If a vector store is configured, embedded nodes are automatically added to it.

Key considerations:

  • Use num_workers for parallel processing of large batches
  • show_progress=True enables progress bars for each transformation
  • in_place=True modifies nodes in place to reduce memory usage
  • The pipeline returns the final list of processed nodes

Step 5: Persist Pipeline State

Save the pipeline cache and document store to disk for reuse. On subsequent runs, the pipeline loads cached results and only processes new or changed documents, significantly reducing processing time for incremental updates.

Key considerations:

  • The cache and docstore are saved as separate JSON files
  • Custom file systems (S3, GCS) are supported via the fs parameter
  • Reloading is done via IngestionPipeline.load() class method

Execution Diagram

GitHub URL

Workflow Repository