Implementation:Datahub project Datahub PipelineConfig
| Field | Value |
|---|---|
| Implementation Name | PipelineConfig |
| Overview | Concrete tool for validating and parsing ingestion recipe YAML into a structured pipeline configuration using pydantic. |
| Type | API Doc |
| Implements | Datahub_project_Datahub_Recipe_Configuration |
| Status | Active |
| Domains | Data_Integration, Metadata_Management |
| Source | DataHub Repository -- metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py (lines 87-131)
|
| Last Updated | 2026-02-10 |
| Knowledge Sources | DataHub Repository |
Description
PipelineConfig is a pydantic ConfigModel that represents the fully validated configuration for a DataHub ingestion pipeline. It parses YAML recipe files into strongly typed Python objects, ensuring correctness before execution begins. It is the central data structure that the Pipeline class uses to initialize all pipeline components.
Class Signature
from datahub.ingestion.run.pipeline_config import PipelineConfig
class PipelineConfig(ConfigModel):
source: SourceConfig
sink: Optional[DynamicTypedConfig] = None
transformers: Optional[List[DynamicTypedConfig]] = None
flags: HiddenFromDocs[FlagsConfig] = FlagsConfig()
reporting: List[ReporterConfig] = []
run_id: str = DEFAULT_RUN_ID
datahub_api: Optional[DatahubClientConfig] = None
pipeline_name: Optional[str] = None
failure_log: FailureLoggingConfig = FailureLoggingConfig()
recording: Optional[RecordingConfig] = Field(
default=None,
description="Recording configuration for debugging ingestion runs.",
)
Source file: metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py, lines 87-131.
Key Fields
| Field | Type | Default | Description |
|---|---|---|---|
source |
SourceConfig |
(required) | The source connector configuration. Contains type (string), config (dict), and extractor (defaults to "generic").
|
sink |
Optional[DynamicTypedConfig] |
None |
The sink configuration. If None, defaults to datahub-rest using the default graph connection.
|
transformers |
Optional[List[DynamicTypedConfig]] |
None |
A list of transformer configurations. Each transformer has a type and optional config.
|
flags |
FlagsConfig |
FlagsConfig() |
Experimental pipeline flags (e.g., generate_browse_path_v2, set_system_metadata).
|
reporting |
List[ReporterConfig] |
[] |
Reporting provider configurations for ingestion run summaries. |
run_id |
str |
auto-generated | A unique identifier for this ingestion run. Auto-generated as {source_type}-{timestamp}-{random} if not specified.
|
datahub_api |
Optional[DatahubClientConfig] |
None |
Connection configuration for the DataHub GMS server. |
pipeline_name |
Optional[str] |
None |
A human-readable name for the pipeline. |
failure_log |
FailureLoggingConfig |
FailureLoggingConfig() |
Configuration for logging failed records to disk. |
recording |
Optional[RecordingConfig] |
None |
Recording configuration for debugging ingestion runs. |
Supporting Classes
SourceConfig
class SourceConfig(DynamicTypedConfig):
extractor: str = "generic"
extractor_config: dict = Field(default_factory=dict)
Extends DynamicTypedConfig which provides a type field (the source plugin name) and a config dict. The extractor field defaults to "generic".
ReporterConfig
class ReporterConfig(DynamicTypedConfig):
required: bool = Field(
False,
description="Whether the reporter is a required reporter or not.",
)
Key Methods
from_dict
@classmethod
def from_dict(
cls, resolved_dict: dict, raw_dict: Optional[dict] = None
) -> "PipelineConfig":
config = cls.model_validate(resolved_dict)
config._raw_dict = raw_dict
return config
Factory method that creates a PipelineConfig from a dictionary (typically loaded from a YAML recipe file). Optionally stores the raw (pre-resolution) dictionary for later reference.
run_id_should_be_semantic (model_validator)
When run_id is not explicitly provided, automatically generates a semantic run ID in the format {source_type}-{YYYY_MM_DD}-{HH_MM_SS}-{random_suffix}.
Import
from datahub.ingestion.run.pipeline_config import PipelineConfig
I/O Contract
Inputs
- A Python dictionary representing the parsed YAML recipe (with environment variables already resolved)
- Optionally, the raw (unresolved) dictionary for debugging
Outputs
- A fully validated
PipelineConfiginstance with all fields populated and defaults applied - Raises
pydantic.ValidationErrorif the configuration is invalid
Usage Example
from datahub.ingestion.run.pipeline_config import PipelineConfig
config_dict = {
"source": {
"type": "snowflake",
"config": {
"account_id": "myaccount",
"username": "user",
"password": "pass",
},
},
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://localhost:8080",
},
},
}
pipeline_config = PipelineConfig.from_dict(config_dict)
print(pipeline_config.source.type) # "snowflake"
print(pipeline_config.run_id) # auto-generated, e.g. "snowflake-2026_02_10-14_30_00-abc123"
Related Pages
- Implements: Datahub_project_Datahub_Recipe_Configuration
- Related: Datahub_project_Datahub_DatahubClientConfig
- Related: Datahub_project_Datahub_Ingest_CLI_Run
- Related: Datahub_project_Datahub_Pip_Install_Acryl_Datahub
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Secret_Handling_And_Deprecation_Patterns