Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub PipelineConfig From Recipe

From Leeroopedia


Property Value
Page Type Implementation (API Doc)
Workflow Metadata_Ingestion_Pipeline
API PipelineConfig.from_dict(resolved_dict: dict, raw_dict: Optional[dict] = None) -> PipelineConfig
Source File metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py
Repository https://github.com/datahub-project/datahub
Implements Principle:Datahub_project_Datahub_Recipe_Configuration
Last Updated 2026-02-09 17:00 GMT

Overview

Description

The PipelineConfig.from_dict() class method is the primary entry point for converting a parsed recipe dictionary into a validated PipelineConfig object. It accepts a resolved dictionary (with environment variables expanded) and an optional raw dictionary (preserving the original template with unexpanded variables), validates the structure against Pydantic models, auto-generates a semantic run_id when none is provided, and returns a fully initialized configuration object ready for pipeline construction.

The PipelineConfig class is a Pydantic ConfigModel that defines the complete schema for an ingestion recipe. It encapsulates the source configuration, optional sink configuration, transformer chain, experimental feature flags, reporting settings, run ID, DataHub API client configuration, pipeline name, and failure logging settings.

Usage

PipelineConfig.from_dict() is called in two primary contexts:

  1. From the CLI: The ingest run command loads a YAML recipe file via load_config_file(), then passes the resulting dictionary to Pipeline.create(), which internally calls PipelineConfig.from_dict().
  2. From programmatic code: A Python script constructs a recipe dictionary and calls PipelineConfig.from_dict() directly, or passes it through Pipeline.create().

Code Reference

Source Location

File Lines Description
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py L87-131 PipelineConfig class definition and from_dict() method
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py L22-25 SourceConfig class (extends DynamicTypedConfig)
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py L42-76 FlagsConfig class with experimental feature flags
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py L78-84 _generate_run_id() helper function

Signature

class PipelineConfig(ConfigModel):
    source: SourceConfig
    sink: Optional[DynamicTypedConfig] = None
    transformers: Optional[List[DynamicTypedConfig]] = None
    flags: HiddenFromDocs[FlagsConfig] = FlagsConfig()
    reporting: List[ReporterConfig] = []
    run_id: str = DEFAULT_RUN_ID
    datahub_api: Optional[DatahubClientConfig] = None
    pipeline_name: Optional[str] = None
    failure_log: FailureLoggingConfig = FailureLoggingConfig()
    recording: Optional[RecordingConfig] = Field(
        default=None,
        description="Recording configuration for debugging ingestion runs.",
    )

    _raw_dict: Optional[dict] = None

    @model_validator(mode="after")
    def run_id_should_be_semantic(self) -> "PipelineConfig":
        if self.run_id == DEFAULT_RUN_ID:
            source_type = None
            if hasattr(self.source, "type"):
                source_type = self.source.type
            self.run_id = _generate_run_id(source_type)
        else:
            assert self.run_id is not None
        return self

    @classmethod
    def from_dict(
        cls, resolved_dict: dict, raw_dict: Optional[dict] = None
    ) -> "PipelineConfig":
        config = cls.model_validate(resolved_dict)
        config._raw_dict = raw_dict
        return config

Import

from datahub.ingestion.run.pipeline_config import PipelineConfig

I/O Contract

Direction Type Description
Input resolved_dict: dict The recipe dictionary with environment variables resolved and directives processed
Input raw_dict: Optional[dict] The original recipe dictionary before variable expansion (preserved for reporting)
Output PipelineConfig A fully validated configuration object with all fields populated and defaults applied
Raises pydantic.ValidationError If the dictionary does not conform to the expected schema

Key fields on the returned PipelineConfig:

Field Type Default Description
source SourceConfig (required) Source plugin type, config dict, and extractor type (default: "generic")
sink Optional[DynamicTypedConfig] None Sink plugin type and config; when None, defaults to datahub-rest at pipeline construction time
transformers Optional[List[DynamicTypedConfig]] None Ordered list of transformer plugin types and configs
flags FlagsConfig FlagsConfig() Experimental flags including generate_browse_path_v2 and set_system_metadata
run_id str Auto-generated Semantic run ID in format {source_type}-{YYYY_MM_DD}-{HH_MM_SS}-{random6}
datahub_api Optional[DatahubClientConfig] None Connection settings for the DataHub GMS server
pipeline_name Optional[str] None Human-readable pipeline name for reporting and system metadata
failure_log FailureLoggingConfig FailureLoggingConfig() Dead-letter queue configuration for failed records

Usage Examples

Example 1: Programmatic recipe construction

from datahub.ingestion.run.pipeline_config import PipelineConfig

recipe = {
    "source": {
        "type": "mysql",
        "config": {
            "host_port": "localhost:3306",
            "database": "my_database",
            "username": "datahub",
            "password": "datahub",
        },
    },
    "sink": {
        "type": "datahub-rest",
        "config": {
            "server": "http://localhost:8080",
        },
    },
    "pipeline_name": "mysql-prod-ingestion",
}

config = PipelineConfig.from_dict(recipe)
print(config.run_id)        # e.g., "mysql-2026_02_09-17_00_00-a1b2c3"
print(config.source.type)   # "mysql"

Example 2: Recipe with transformers and flags

recipe = {
    "source": {
        "type": "snowflake",
        "config": {
            "account_id": "myaccount",
            "username": "${SNOWFLAKE_USER}",
            "password": "${SNOWFLAKE_PASS}",
        },
    },
    "transformers": [
        {
            "type": "simple_add_dataset_ownership",
            "config": {
                "owner_urns": ["urn:li:corpuser:datateam"],
            },
        }
    ],
    "flags": {
        "generate_browse_path_v2": True,
    },
}

config = PipelineConfig.from_dict(recipe)
print(len(config.transformers))  # 1
print(config.flags.generate_browse_path_v2)  # True

Example 3: Preserving the raw config for reporting

import os
from datahub.configuration.config_loader import load_config_file

pipeline_config = load_config_file(
    "recipe.yml",
    squirrel_original_config=True,
    squirrel_field="__raw_config",
    resolve_env_vars=True,
)
raw_config = pipeline_config.pop("__raw_config")

config = PipelineConfig.from_dict(pipeline_config, raw_dict=raw_config)
# config._raw_dict preserves ${ENV_VAR} placeholders for safe reporting

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment