Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Deepset ai Haystack Pipeline Run

From Leeroopedia
Knowledge Sources
Domains Software_Architecture, Workflow_Orchestration
Last Updated 2026-02-11 00:00 GMT

Overview

Concrete tool for building and executing component pipelines provided by the Haystack framework.

Description

The Pipeline class is the synchronous orchestration engine for Haystack. It manages a directed graph of components (via NetworkX), validates connections between component input/output sockets, resolves execution order using a priority queue, and runs components sequentially. It supports breakpoints for debugging, snapshot/resume for long-running pipelines, and OpenTelemetry tracing integration. Pipeline.run() accepts a dictionary keyed by component names mapping to their input parameters.

Usage

Import this class when building any Haystack pipeline: RAG, document indexing, hybrid search, evaluation, or preprocessing. Use add_component to register components, connect to wire outputs to inputs, and run to execute the pipeline with data.

Code Reference

Source Location

  • Repository: haystack
  • File: haystack/core/pipeline/pipeline.py
  • Lines: L34-447 (Pipeline class with run method at L109-447)

Signature

class Pipeline(PipelineBase):
    def run(
        self,
        data: dict[str, Any],
        include_outputs_from: set[str] | None = None,
        *,
        break_point: Breakpoint | AgentBreakpoint | None = None,
        pipeline_snapshot: PipelineSnapshot | None = None,
        snapshot_callback: SnapshotCallback | None = None,
    ) -> dict[str, Any]:
        """
        Args:
            data: Component-keyed input dict, e.g., {"retriever": {"query": q}}.
            include_outputs_from: Set of component names whose intermediate outputs to include.
            break_point: Optional breakpoint for debugging.
            pipeline_snapshot: Snapshot to resume from.
            snapshot_callback: Callback for custom snapshot handling.
        Returns:
            Dict of component outputs for leaf components (or included intermediates).
        """

Import

from haystack import Pipeline

I/O Contract

Inputs

Name Type Required Description
data dict[str, Any] Yes Component-keyed input dictionary
include_outputs_from set[str] or None No Components whose outputs to include beyond leaf nodes

Outputs

Name Type Description
result dict[str, Any] Dictionary of component name to output dict for leaf components

Usage Examples

Basic RAG Pipeline

from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder

# Set up document store
document_store = InMemoryDocumentStore()
document_store.write_documents([
    Document(content="My name is Jean and I live in Paris."),
    Document(content="My name is Mark and I live in Berlin."),
])

# Define prompt template
template = """Given these documents, answer the question.
Documents:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:"""

# Build pipeline
pipeline = Pipeline()
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component("prompt_builder", PromptBuilder(template=template))
pipeline.add_component("llm", OpenAIGenerator())
pipeline.connect("retriever", "prompt_builder.documents")
pipeline.connect("prompt_builder", "llm")

# Run pipeline
question = "Who lives in Paris?"
results = pipeline.run({
    "retriever": {"query": question},
    "prompt_builder": {"question": question},
})
print(results["llm"]["replies"])

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment