Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub PipelineConfig

From Leeroopedia


Field Value
Implementation Name PipelineConfig
Overview Concrete tool for validating and parsing ingestion recipe YAML into a structured pipeline configuration using pydantic.
Type API Doc
Implements Datahub_project_Datahub_Recipe_Configuration
Status Active
Domains Data_Integration, Metadata_Management
Source DataHub Repository -- metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py (lines 87-131)
Last Updated 2026-02-10
Knowledge Sources DataHub Repository

Description

PipelineConfig is a pydantic ConfigModel that represents the fully validated configuration for a DataHub ingestion pipeline. It parses YAML recipe files into strongly typed Python objects, ensuring correctness before execution begins. It is the central data structure that the Pipeline class uses to initialize all pipeline components.

Class Signature

from datahub.ingestion.run.pipeline_config import PipelineConfig

class PipelineConfig(ConfigModel):
    source: SourceConfig
    sink: Optional[DynamicTypedConfig] = None
    transformers: Optional[List[DynamicTypedConfig]] = None
    flags: HiddenFromDocs[FlagsConfig] = FlagsConfig()
    reporting: List[ReporterConfig] = []
    run_id: str = DEFAULT_RUN_ID
    datahub_api: Optional[DatahubClientConfig] = None
    pipeline_name: Optional[str] = None
    failure_log: FailureLoggingConfig = FailureLoggingConfig()
    recording: Optional[RecordingConfig] = Field(
        default=None,
        description="Recording configuration for debugging ingestion runs.",
    )

Source file: metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py, lines 87-131.

Key Fields

Field Type Default Description
source SourceConfig (required) The source connector configuration. Contains type (string), config (dict), and extractor (defaults to "generic").
sink Optional[DynamicTypedConfig] None The sink configuration. If None, defaults to datahub-rest using the default graph connection.
transformers Optional[List[DynamicTypedConfig]] None A list of transformer configurations. Each transformer has a type and optional config.
flags FlagsConfig FlagsConfig() Experimental pipeline flags (e.g., generate_browse_path_v2, set_system_metadata).
reporting List[ReporterConfig] [] Reporting provider configurations for ingestion run summaries.
run_id str auto-generated A unique identifier for this ingestion run. Auto-generated as {source_type}-{timestamp}-{random} if not specified.
datahub_api Optional[DatahubClientConfig] None Connection configuration for the DataHub GMS server.
pipeline_name Optional[str] None A human-readable name for the pipeline.
failure_log FailureLoggingConfig FailureLoggingConfig() Configuration for logging failed records to disk.
recording Optional[RecordingConfig] None Recording configuration for debugging ingestion runs.

Supporting Classes

SourceConfig

class SourceConfig(DynamicTypedConfig):
    extractor: str = "generic"
    extractor_config: dict = Field(default_factory=dict)

Extends DynamicTypedConfig which provides a type field (the source plugin name) and a config dict. The extractor field defaults to "generic".

ReporterConfig

class ReporterConfig(DynamicTypedConfig):
    required: bool = Field(
        False,
        description="Whether the reporter is a required reporter or not.",
    )

Key Methods

from_dict

@classmethod
def from_dict(
    cls, resolved_dict: dict, raw_dict: Optional[dict] = None
) -> "PipelineConfig":
    config = cls.model_validate(resolved_dict)
    config._raw_dict = raw_dict
    return config

Factory method that creates a PipelineConfig from a dictionary (typically loaded from a YAML recipe file). Optionally stores the raw (pre-resolution) dictionary for later reference.

run_id_should_be_semantic (model_validator)

When run_id is not explicitly provided, automatically generates a semantic run ID in the format {source_type}-{YYYY_MM_DD}-{HH_MM_SS}-{random_suffix}.

Import

from datahub.ingestion.run.pipeline_config import PipelineConfig

I/O Contract

Inputs

  • A Python dictionary representing the parsed YAML recipe (with environment variables already resolved)
  • Optionally, the raw (unresolved) dictionary for debugging

Outputs

  • A fully validated PipelineConfig instance with all fields populated and defaults applied
  • Raises pydantic.ValidationError if the configuration is invalid

Usage Example

from datahub.ingestion.run.pipeline_config import PipelineConfig

config_dict = {
    "source": {
        "type": "snowflake",
        "config": {
            "account_id": "myaccount",
            "username": "user",
            "password": "pass",
        },
    },
    "sink": {
        "type": "datahub-rest",
        "config": {
            "server": "http://localhost:8080",
        },
    },
}

pipeline_config = PipelineConfig.from_dict(config_dict)
print(pipeline_config.source.type)  # "snowflake"
print(pipeline_config.run_id)       # auto-generated, e.g. "snowflake-2026_02_10-14_30_00-abc123"

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment