Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Pipeline Create And Run

From Leeroopedia


Property Value
Page Type Implementation (API Doc)
Workflow Metadata_Ingestion_Pipeline
API Pipeline.create(config_dict, ...) -> Pipeline, Pipeline.run() -> None, Pipeline.pretty_print_summary(...) -> int
Source File metadata-ingestion/src/datahub/ingestion/run/pipeline.py (L149-795)
Repository https://github.com/datahub-project/datahub
Implements Principle:Datahub_project_Datahub_Pipeline_Execution
Last Updated 2026-02-09 17:00 GMT

Overview

Description

The Pipeline class is the central orchestrator of the DataHub metadata ingestion framework. It manages the complete lifecycle of an ingestion run: parsing the configuration, resolving plugins from registries, establishing connections to data sources and sinks, iterating over work units, applying transformers, writing records to the sink, processing stateful ingestion commits, and generating summary reports.

The class exposes three primary methods that form the standard execution sequence:

  1. Pipeline.create(): A class method that accepts a raw recipe dictionary, validates it into a PipelineConfig, and constructs a fully initialized Pipeline instance with all components (source, sink, transformers, extractor, reporters) wired up.
  2. Pipeline.run(): Executes the pipeline by iterating over the source's work units, extracting records, passing them through the transformer chain, and writing them to the sink. Manages error handling, progress reporting, and final commit processing.
  3. Pipeline.pretty_print_summary(): Prints a color-coded summary of the pipeline run to the terminal, including CLI metrics, source report, sink report, global warnings, and an overall status line. Returns an integer exit code (0 for success, 1 for failures).

Usage

The standard usage pattern is:

from datahub.ingestion.run.pipeline import Pipeline

pipeline = Pipeline.create(config_dict, dry_run=False)
pipeline.run()
exit_code = pipeline.pretty_print_summary(warnings_as_failure=False)

This pattern is used by both the CLI (ingest_cli.py) and programmatic callers.

Code Reference

Source Location

File Lines Description
metadata-ingestion/src/datahub/ingestion/run/pipeline.py L149-280 Pipeline.__init__() - component initialization with exit stacks
metadata-ingestion/src/datahub/ingestion/run/pipeline.py L418-437 Pipeline.create() - class method factory
metadata-ingestion/src/datahub/ingestion/run/pipeline.py L461-571 Pipeline.run() - main execution loop
metadata-ingestion/src/datahub/ingestion/run/pipeline.py L572-581 Pipeline.transform() - transformer chain application
metadata-ingestion/src/datahub/ingestion/run/pipeline.py L583-618 Pipeline.process_commits() - stateful ingestion commit handling
metadata-ingestion/src/datahub/ingestion/run/pipeline.py L696-770 Pipeline.pretty_print_summary() - terminal report output

Signature

class Pipeline:
    config: PipelineConfig
    ctx: PipelineContext
    source: Source
    extractor: Extractor
    sink_type: str
    sink: Sink[ConfigModel, SinkReport]
    transformers: List[Transformer]

    @classmethod
    def create(
        cls,
        config_dict: dict,
        dry_run: bool = False,
        preview_mode: bool = False,
        preview_workunits: int = 10,
        report_to: Optional[str] = "datahub",
        no_progress: bool = False,
        raw_config: Optional[dict] = None,
    ) -> "Pipeline":
        config = PipelineConfig.from_dict(config_dict, raw_config)
        return cls(
            config,
            dry_run=dry_run,
            preview_mode=preview_mode,
            preview_workunits=preview_workunits,
            report_to=report_to,
            no_progress=no_progress,
        )

    def run(self) -> None:
        # Iterates source work units, extracts records, transforms, writes to sink
        ...

    def pretty_print_summary(
        self, warnings_as_failure: bool = False, currently_running: bool = False
    ) -> int:
        # Returns 0 on success, 1 on failure
        ...

Import

from datahub.ingestion.run.pipeline import Pipeline

I/O Contract

Pipeline.create()

Direction Type Description
Input config_dict: dict Recipe dictionary (resolved)
Input dry_run: bool Skip sink writes when True (default: False)
Input preview_mode: bool Limit work units when True (default: False)
Input preview_workunits: int Max work units in preview mode (default: 10)
Input report_to: Optional[str] Report destination; "datahub" for server, file path, or None (default: "datahub")
Input no_progress: bool Suppress intermediate progress reports (default: False)
Input raw_config: Optional[dict] Unexpanded recipe for safe reporting
Output Pipeline Fully initialized pipeline instance
Raises PipelineInitError If any initialization step fails (connection, plugin lookup, configuration)

Pipeline.run()

Direction Type Description
Input (none - uses instance state)
Output None Sets self.final_status to COMPLETED, ERROR, or CANCELLED
Side Effect Metadata written to sink Unless dry_run=True
Side Effect Reporter notifications Start and completion events sent to registered reporters
Side Effect Stateful commits Committables processed based on their commit policies

Pipeline.pretty_print_summary()

Direction Type Description
Input warnings_as_failure: bool Treat warnings as failure (non-zero exit) (default: False)
Output int 0 on success, 1 on failure (or warnings if strict)
Side Effect Terminal output Prints CLI report, source report, sink report, global warnings, and status line

Usage Examples

Example 1: Standard CLI-driven ingestion

from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.run.pipeline import Pipeline

pipeline_config = load_config_file(
    "recipe.yml",
    squirrel_original_config=True,
    squirrel_field="__raw_config",
    resolve_env_vars=True,
)
raw_config = pipeline_config.pop("__raw_config")

pipeline = Pipeline.create(
    pipeline_config,
    dry_run=False,
    report_to="datahub",
    raw_config=raw_config,
)
pipeline.run()
exit_code = pipeline.pretty_print_summary(warnings_as_failure=False)

Example 2: Programmatic pipeline with preview mode

from datahub.ingestion.run.pipeline import Pipeline

recipe = {
    "source": {
        "type": "snowflake",
        "config": {
            "account_id": "myaccount",
            "username": "user",
            "password": "pass",
        },
    },
    "transformers": [
        {
            "type": "simple_add_dataset_ownership",
            "config": {"owner_urns": ["urn:li:corpuser:admin"]},
        }
    ],
}

pipeline = Pipeline.create(recipe, preview_mode=True, preview_workunits=5)
pipeline.run()
pipeline.pretty_print_summary()

Example 3: Pipeline with error checking

from datahub.ingestion.run.pipeline import Pipeline
from datahub.configuration.common import PipelineExecutionError

pipeline = Pipeline.create(recipe)
pipeline.run()

try:
    pipeline.raise_from_status(raise_warnings=True)
except PipelineExecutionError as e:
    print(f"Pipeline failed: {e}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment