Implementation:Datahub project Datahub Pipeline Create And Run
| Property | Value |
|---|---|
| Page Type | Implementation (API Doc) |
| Workflow | Metadata_Ingestion_Pipeline |
| API | Pipeline.create(config_dict, ...) -> Pipeline, Pipeline.run() -> None, Pipeline.pretty_print_summary(...) -> int
|
| Source File | metadata-ingestion/src/datahub/ingestion/run/pipeline.py (L149-795)
|
| Repository | https://github.com/datahub-project/datahub |
| Implements | Principle:Datahub_project_Datahub_Pipeline_Execution |
| Last Updated | 2026-02-09 17:00 GMT |
Overview
Description
The Pipeline class is the central orchestrator of the DataHub metadata ingestion framework. It manages the complete lifecycle of an ingestion run: parsing the configuration, resolving plugins from registries, establishing connections to data sources and sinks, iterating over work units, applying transformers, writing records to the sink, processing stateful ingestion commits, and generating summary reports.
The class exposes three primary methods that form the standard execution sequence:
Pipeline.create(): A class method that accepts a raw recipe dictionary, validates it into aPipelineConfig, and constructs a fully initializedPipelineinstance with all components (source, sink, transformers, extractor, reporters) wired up.Pipeline.run(): Executes the pipeline by iterating over the source's work units, extracting records, passing them through the transformer chain, and writing them to the sink. Manages error handling, progress reporting, and final commit processing.Pipeline.pretty_print_summary(): Prints a color-coded summary of the pipeline run to the terminal, including CLI metrics, source report, sink report, global warnings, and an overall status line. Returns an integer exit code (0 for success, 1 for failures).
Usage
The standard usage pattern is:
from datahub.ingestion.run.pipeline import Pipeline
pipeline = Pipeline.create(config_dict, dry_run=False)
pipeline.run()
exit_code = pipeline.pretty_print_summary(warnings_as_failure=False)
This pattern is used by both the CLI (ingest_cli.py) and programmatic callers.
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
metadata-ingestion/src/datahub/ingestion/run/pipeline.py |
L149-280 | Pipeline.__init__() - component initialization with exit stacks
|
metadata-ingestion/src/datahub/ingestion/run/pipeline.py |
L418-437 | Pipeline.create() - class method factory
|
metadata-ingestion/src/datahub/ingestion/run/pipeline.py |
L461-571 | Pipeline.run() - main execution loop
|
metadata-ingestion/src/datahub/ingestion/run/pipeline.py |
L572-581 | Pipeline.transform() - transformer chain application
|
metadata-ingestion/src/datahub/ingestion/run/pipeline.py |
L583-618 | Pipeline.process_commits() - stateful ingestion commit handling
|
metadata-ingestion/src/datahub/ingestion/run/pipeline.py |
L696-770 | Pipeline.pretty_print_summary() - terminal report output
|
Signature
class Pipeline:
config: PipelineConfig
ctx: PipelineContext
source: Source
extractor: Extractor
sink_type: str
sink: Sink[ConfigModel, SinkReport]
transformers: List[Transformer]
@classmethod
def create(
cls,
config_dict: dict,
dry_run: bool = False,
preview_mode: bool = False,
preview_workunits: int = 10,
report_to: Optional[str] = "datahub",
no_progress: bool = False,
raw_config: Optional[dict] = None,
) -> "Pipeline":
config = PipelineConfig.from_dict(config_dict, raw_config)
return cls(
config,
dry_run=dry_run,
preview_mode=preview_mode,
preview_workunits=preview_workunits,
report_to=report_to,
no_progress=no_progress,
)
def run(self) -> None:
# Iterates source work units, extracts records, transforms, writes to sink
...
def pretty_print_summary(
self, warnings_as_failure: bool = False, currently_running: bool = False
) -> int:
# Returns 0 on success, 1 on failure
...
Import
from datahub.ingestion.run.pipeline import Pipeline
I/O Contract
Pipeline.create()
| Direction | Type | Description |
|---|---|---|
| Input | config_dict: dict |
Recipe dictionary (resolved) |
| Input | dry_run: bool |
Skip sink writes when True (default: False) |
| Input | preview_mode: bool |
Limit work units when True (default: False) |
| Input | preview_workunits: int |
Max work units in preview mode (default: 10) |
| Input | report_to: Optional[str] |
Report destination; "datahub" for server, file path, or None (default: "datahub") |
| Input | no_progress: bool |
Suppress intermediate progress reports (default: False) |
| Input | raw_config: Optional[dict] |
Unexpanded recipe for safe reporting |
| Output | Pipeline |
Fully initialized pipeline instance |
| Raises | PipelineInitError |
If any initialization step fails (connection, plugin lookup, configuration) |
Pipeline.run()
| Direction | Type | Description |
|---|---|---|
| Input | (none - uses instance state) | — |
| Output | None |
Sets self.final_status to COMPLETED, ERROR, or CANCELLED
|
| Side Effect | Metadata written to sink | Unless dry_run=True
|
| Side Effect | Reporter notifications | Start and completion events sent to registered reporters |
| Side Effect | Stateful commits | Committables processed based on their commit policies |
Pipeline.pretty_print_summary()
| Direction | Type | Description |
|---|---|---|
| Input | warnings_as_failure: bool |
Treat warnings as failure (non-zero exit) (default: False) |
| Output | int |
0 on success, 1 on failure (or warnings if strict) |
| Side Effect | Terminal output | Prints CLI report, source report, sink report, global warnings, and status line |
Usage Examples
Example 1: Standard CLI-driven ingestion
from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.run.pipeline import Pipeline
pipeline_config = load_config_file(
"recipe.yml",
squirrel_original_config=True,
squirrel_field="__raw_config",
resolve_env_vars=True,
)
raw_config = pipeline_config.pop("__raw_config")
pipeline = Pipeline.create(
pipeline_config,
dry_run=False,
report_to="datahub",
raw_config=raw_config,
)
pipeline.run()
exit_code = pipeline.pretty_print_summary(warnings_as_failure=False)
Example 2: Programmatic pipeline with preview mode
from datahub.ingestion.run.pipeline import Pipeline
recipe = {
"source": {
"type": "snowflake",
"config": {
"account_id": "myaccount",
"username": "user",
"password": "pass",
},
},
"transformers": [
{
"type": "simple_add_dataset_ownership",
"config": {"owner_urns": ["urn:li:corpuser:admin"]},
}
],
}
pipeline = Pipeline.create(recipe, preview_mode=True, preview_workunits=5)
pipeline.run()
pipeline.pretty_print_summary()
Example 3: Pipeline with error checking
from datahub.ingestion.run.pipeline import Pipeline
from datahub.configuration.common import PipelineExecutionError
pipeline = Pipeline.create(recipe)
pipeline.run()
try:
pipeline.raise_from_status(raise_warnings=True)
except PipelineExecutionError as e:
print(f"Pipeline failed: {e}")
Related Pages
- Principle:Datahub_project_Datahub_Pipeline_Execution
- Implementation:Datahub_project_Datahub_PipelineConfig_From_Recipe
- Implementation:Datahub_project_Datahub_Datahub_Ingest_Dry_Run
- Implementation:Datahub_project_Datahub_DatahubRestSink_Write_Record
- Environment:Datahub_project_Datahub_Python_Ingestion