Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Datahub project Datahub Pipeline Execution

From Leeroopedia


Property Value
Page Type Principle
Workflow Metadata_Ingestion_Pipeline
Concept Orchestrated metadata extraction, transformation, and loading pipeline
Repository https://github.com/datahub-project/datahub
Implemented By Implementation:Datahub_project_Datahub_Pipeline_Create_And_Run
Last Updated 2026-02-09 17:00 GMT

Overview

Description

The Pipeline Execution principle governs how DataHub orchestrates the end-to-end flow of metadata from a data source through optional transformations and into a destination sink. This is the core runtime operation of the ingestion framework: a source-transform-sink pipeline with pluggable components at each stage.

The pipeline follows an ETL (Extract-Transform-Load) pattern adapted for metadata:

  1. Extract: A Source plugin connects to a data system (e.g., Snowflake, MySQL, Kafka) and produces a stream of work units, each containing one or more metadata records.
  2. Transform: Zero or more Transformer plugins process the metadata stream in sequence. Transformers can add, modify, or filter metadata records. The transformer chain is applied as a series of iterable transformations, preserving the streaming nature of the pipeline.
  3. Load: A Sink plugin delivers the transformed metadata records to a destination, typically a DataHub instance via REST API or Kafka.

The pipeline is designed for streaming execution: records flow from source through transformers to sink without accumulating the entire dataset in memory. The source produces work units lazily via Python generators, and the pipeline processes them one at a time (or in bounded batches for the batch sink mode). This architecture enables ingestion of metadata from very large data systems.

The pipeline also manages its own lifecycle, including error handling, commit policies for stateful ingestion, progress reporting, telemetry, and resource cleanup via context managers and exit stacks.

Usage

Pipeline Execution is the central operation in every DataHub ingestion workflow:

  • CLI invocation: datahub ingest run -c recipe.yml creates and runs a pipeline from a recipe file.
  • Programmatic API: Python code calls Pipeline.create(config_dict) to instantiate a pipeline, then pipeline.run() to execute it, and pipeline.pretty_print_summary() to report results.
  • Preview mode: The pipeline can be run with a bounded number of work units (via preview_workunits) to sample the output without processing the entire source.
  • Dry-run mode: The pipeline executes extraction and transformation but skips sink writes.

The Pipeline class coordinates all phases:

  1. Resolves the source, sink, extractor, and transformers from the plugin registries
  2. Establishes a PipelineContext with the run ID, graph client, and flags
  3. Iterates over work units from the source
  4. Extracts record envelopes via the extractor
  5. Passes records through the transformer chain
  6. Writes records to the sink asynchronously (with a write callback for success/failure tracking)
  7. Processes commits for stateful ingestion after all work units are consumed
  8. Prints a summary report with failure/warning counts and timing

Theoretical Basis

The Pipeline Execution principle is grounded in several foundational patterns:

ETL pipeline architecture. The Extract-Transform-Load pattern is a well-established approach to data integration. DataHub's adaptation replaces traditional data records with metadata records (MCPs/MCEs) and replaces the data warehouse sink with a metadata service. The fundamental three-stage decomposition provides clean separation of concerns and enables independent development and testing of each stage.

Plugin-based extensibility. Each pipeline stage (source, transformer, sink, extractor) is resolved from a registry at construction time. The registries use setuptools entry points, allowing new plugins to be installed via pip without modifying the core framework. This follows the Strategy pattern: the pipeline's behavior is determined by the concrete strategy (plugin) selected at configuration time.

Streaming processing with backpressure. The pipeline processes work units as a lazy iterator rather than materializing the full dataset. The sink's asynchronous write path includes a max_pending_requests limit that provides implicit backpressure: when the sink is overwhelmed, the main thread blocks until pending requests drop below the threshold. This prevents unbounded memory growth and ensures stable execution for large sources.

Lifecycle management via context managers. The pipeline uses Python's contextlib.ExitStack to manage the lifetimes of the source, sink, graph client, and other resources. If any initialization step fails, previously initialized resources are cleaned up automatically. The source and sink are managed by separate exit stacks so that the source can be closed before stateful ingestion commits are processed.

Commit policies for stateful ingestion. After all work units are consumed, the pipeline evaluates commit policies for each committable registered in the context. Committables (such as stateful ingestion state stores) specify whether they should commit on success only (ON_NO_ERRORS) or on clean runs with no warnings (ON_NO_ERRORS_AND_NO_WARNINGS). This ensures that pipeline failures do not corrupt persistent state.

Observability and reporting. The pipeline tracks pipeline status (UNKNOWN, COMPLETED, ERROR, CANCELLED), collects CLI-level resource metrics (memory, disk, thread counts), notifies registered reporters at start and completion, and emits telemetry with ingestion statistics. The pretty_print_summary method produces a color-coded terminal summary with failure and warning counts.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment