Implementation:Datahub project Datahub PipelineConfig From Recipe
| Property | Value |
|---|---|
| Page Type | Implementation (API Doc) |
| Workflow | Metadata_Ingestion_Pipeline |
| API | PipelineConfig.from_dict(resolved_dict: dict, raw_dict: Optional[dict] = None) -> PipelineConfig
|
| Source File | metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py
|
| Repository | https://github.com/datahub-project/datahub |
| Implements | Principle:Datahub_project_Datahub_Recipe_Configuration |
| Last Updated | 2026-02-09 17:00 GMT |
Overview
Description
The PipelineConfig.from_dict() class method is the primary entry point for converting a parsed recipe dictionary into a validated PipelineConfig object. It accepts a resolved dictionary (with environment variables expanded) and an optional raw dictionary (preserving the original template with unexpanded variables), validates the structure against Pydantic models, auto-generates a semantic run_id when none is provided, and returns a fully initialized configuration object ready for pipeline construction.
The PipelineConfig class is a Pydantic ConfigModel that defines the complete schema for an ingestion recipe. It encapsulates the source configuration, optional sink configuration, transformer chain, experimental feature flags, reporting settings, run ID, DataHub API client configuration, pipeline name, and failure logging settings.
Usage
PipelineConfig.from_dict() is called in two primary contexts:
- From the CLI: The
ingest runcommand loads a YAML recipe file viaload_config_file(), then passes the resulting dictionary toPipeline.create(), which internally callsPipelineConfig.from_dict(). - From programmatic code: A Python script constructs a recipe dictionary and calls
PipelineConfig.from_dict()directly, or passes it throughPipeline.create().
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py |
L87-131 | PipelineConfig class definition and from_dict() method
|
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py |
L22-25 | SourceConfig class (extends DynamicTypedConfig)
|
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py |
L42-76 | FlagsConfig class with experimental feature flags
|
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py |
L78-84 | _generate_run_id() helper function
|
Signature
class PipelineConfig(ConfigModel):
source: SourceConfig
sink: Optional[DynamicTypedConfig] = None
transformers: Optional[List[DynamicTypedConfig]] = None
flags: HiddenFromDocs[FlagsConfig] = FlagsConfig()
reporting: List[ReporterConfig] = []
run_id: str = DEFAULT_RUN_ID
datahub_api: Optional[DatahubClientConfig] = None
pipeline_name: Optional[str] = None
failure_log: FailureLoggingConfig = FailureLoggingConfig()
recording: Optional[RecordingConfig] = Field(
default=None,
description="Recording configuration for debugging ingestion runs.",
)
_raw_dict: Optional[dict] = None
@model_validator(mode="after")
def run_id_should_be_semantic(self) -> "PipelineConfig":
if self.run_id == DEFAULT_RUN_ID:
source_type = None
if hasattr(self.source, "type"):
source_type = self.source.type
self.run_id = _generate_run_id(source_type)
else:
assert self.run_id is not None
return self
@classmethod
def from_dict(
cls, resolved_dict: dict, raw_dict: Optional[dict] = None
) -> "PipelineConfig":
config = cls.model_validate(resolved_dict)
config._raw_dict = raw_dict
return config
Import
from datahub.ingestion.run.pipeline_config import PipelineConfig
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | resolved_dict: dict |
The recipe dictionary with environment variables resolved and directives processed |
| Input | raw_dict: Optional[dict] |
The original recipe dictionary before variable expansion (preserved for reporting) |
| Output | PipelineConfig |
A fully validated configuration object with all fields populated and defaults applied |
| Raises | pydantic.ValidationError |
If the dictionary does not conform to the expected schema |
Key fields on the returned PipelineConfig:
| Field | Type | Default | Description |
|---|---|---|---|
source |
SourceConfig |
(required) | Source plugin type, config dict, and extractor type (default: "generic")
|
sink |
Optional[DynamicTypedConfig] |
None |
Sink plugin type and config; when None, defaults to datahub-rest at pipeline construction time
|
transformers |
Optional[List[DynamicTypedConfig]] |
None |
Ordered list of transformer plugin types and configs |
flags |
FlagsConfig |
FlagsConfig() |
Experimental flags including generate_browse_path_v2 and set_system_metadata
|
run_id |
str |
Auto-generated | Semantic run ID in format {source_type}-{YYYY_MM_DD}-{HH_MM_SS}-{random6}
|
datahub_api |
Optional[DatahubClientConfig] |
None |
Connection settings for the DataHub GMS server |
pipeline_name |
Optional[str] |
None |
Human-readable pipeline name for reporting and system metadata |
failure_log |
FailureLoggingConfig |
FailureLoggingConfig() |
Dead-letter queue configuration for failed records |
Usage Examples
Example 1: Programmatic recipe construction
from datahub.ingestion.run.pipeline_config import PipelineConfig
recipe = {
"source": {
"type": "mysql",
"config": {
"host_port": "localhost:3306",
"database": "my_database",
"username": "datahub",
"password": "datahub",
},
},
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://localhost:8080",
},
},
"pipeline_name": "mysql-prod-ingestion",
}
config = PipelineConfig.from_dict(recipe)
print(config.run_id) # e.g., "mysql-2026_02_09-17_00_00-a1b2c3"
print(config.source.type) # "mysql"
Example 2: Recipe with transformers and flags
recipe = {
"source": {
"type": "snowflake",
"config": {
"account_id": "myaccount",
"username": "${SNOWFLAKE_USER}",
"password": "${SNOWFLAKE_PASS}",
},
},
"transformers": [
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": ["urn:li:corpuser:datateam"],
},
}
],
"flags": {
"generate_browse_path_v2": True,
},
}
config = PipelineConfig.from_dict(recipe)
print(len(config.transformers)) # 1
print(config.flags.generate_browse_path_v2) # True
Example 3: Preserving the raw config for reporting
import os
from datahub.configuration.config_loader import load_config_file
pipeline_config = load_config_file(
"recipe.yml",
squirrel_original_config=True,
squirrel_field="__raw_config",
resolve_env_vars=True,
)
raw_config = pipeline_config.pop("__raw_config")
config = PipelineConfig.from_dict(pipeline_config, raw_dict=raw_config)
# config._raw_dict preserves ${ENV_VAR} placeholders for safe reporting