Implementation:Deepset ai Haystack Pipeline Run
| Knowledge Sources | |
|---|---|
| Domains | Software_Architecture, Workflow_Orchestration |
| Last Updated | 2026-02-11 00:00 GMT |
Overview
Concrete tool for building and executing component pipelines provided by the Haystack framework.
Description
The Pipeline class is the synchronous orchestration engine for Haystack. It manages a directed graph of components (via NetworkX), validates connections between component input/output sockets, resolves execution order using a priority queue, and runs components sequentially. It supports breakpoints for debugging, snapshot/resume for long-running pipelines, and OpenTelemetry tracing integration. Pipeline.run() accepts a dictionary keyed by component names mapping to their input parameters.
Usage
Import this class when building any Haystack pipeline: RAG, document indexing, hybrid search, evaluation, or preprocessing. Use add_component to register components, connect to wire outputs to inputs, and run to execute the pipeline with data.
Code Reference
Source Location
- Repository: haystack
- File: haystack/core/pipeline/pipeline.py
- Lines: L34-447 (Pipeline class with run method at L109-447)
Signature
class Pipeline(PipelineBase):
def run(
self,
data: dict[str, Any],
include_outputs_from: set[str] | None = None,
*,
break_point: Breakpoint | AgentBreakpoint | None = None,
pipeline_snapshot: PipelineSnapshot | None = None,
snapshot_callback: SnapshotCallback | None = None,
) -> dict[str, Any]:
"""
Args:
data: Component-keyed input dict, e.g., {"retriever": {"query": q}}.
include_outputs_from: Set of component names whose intermediate outputs to include.
break_point: Optional breakpoint for debugging.
pipeline_snapshot: Snapshot to resume from.
snapshot_callback: Callback for custom snapshot handling.
Returns:
Dict of component outputs for leaf components (or included intermediates).
"""
Import
from haystack import Pipeline
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | dict[str, Any] | Yes | Component-keyed input dictionary |
| include_outputs_from | set[str] or None | No | Components whose outputs to include beyond leaf nodes |
Outputs
| Name | Type | Description |
|---|---|---|
| result | dict[str, Any] | Dictionary of component name to output dict for leaf components |
Usage Examples
Basic RAG Pipeline
from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder
# Set up document store
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
])
# Define prompt template
template = """Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:"""
# Build pipeline
pipeline = Pipeline()
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component("prompt_builder", PromptBuilder(template=template))
pipeline.add_component("llm", OpenAIGenerator())
pipeline.connect("retriever", "prompt_builder.documents")
pipeline.connect("prompt_builder", "llm")
# Run pipeline
question = "Who lives in Paris?"
results = pipeline.run({
"retriever": {"query": question},
"prompt_builder": {"question": question},
})
print(results["llm"]["replies"])