Implementation:Datahub project Datahub Actions PipelineConfig
Appearance
Metadata
| Field | Value |
|---|---|
| Implementation ID | I-DHACT-002 |
| Title | Actions PipelineConfig |
| Type | API Doc |
| Status | Active |
| Last Updated | 2026-02-10 |
| Repository | Datahub_project_Datahub |
| Source File | datahub-actions/src/datahub_actions/pipeline/pipeline_config.py (Lines 59-74)
|
| Knowledge Sources | GitHub - datahub-project/datahub, DataHub Documentation |
| Domains | Event_Processing, Automation, Metadata_Management |
Overview
The PipelineConfig Pydantic model defines the complete configuration schema for a DataHub Actions pipeline. It maps directly to the YAML configuration file structure, with supporting models for each pipeline stage: SourceConfig, FilterConfig, TransformConfig, ActionConfig, and PipelineOptions.
Code Reference
PipelineConfig Class (pipeline_config.py, Lines 59-74)
class PipelineConfig(ConfigModel):
"""
Configuration required to create a new Actions Pipeline.
This exactly matches the structure of the YAML file used
to configure a Pipeline.
"""
name: str
enabled: bool = True
source: SourceConfig
filter: Optional[FilterConfig] = None
transform: Optional[List[TransformConfig]] = None
action: ActionConfig
datahub: Optional[DatahubClientConfig] = None
options: Optional[PipelineOptions] = None
Supporting Configuration Models (Lines 24-57)
class FailureMode(ConfigEnum):
THROW = "THROW" # Stop pipeline on failure
CONTINUE = "CONTINUE" # Log failure and continue processing
class SourceConfig(ConfigModel):
type: str
config: Optional[Dict[str, Any]] = Field(default=None)
class TransformConfig(ConfigModel):
type: str
config: Optional[Dict[str, Any]] = Field(default=None)
class FilterConfig(ConfigModel):
event_type: Union[str, List[str]]
event: Optional[Dict[str, Any]] = Field(default=None)
class ActionConfig(ConfigModel):
type: str
config: Optional[Dict[str, Any]] = Field(default=None)
class PipelineOptions(BaseModel):
retry_count: Optional[int] = Field(default=None)
failure_mode: Optional[FailureMode] = Field(default=None)
failed_events_dir: Optional[str] = Field(
default=None, description="The path where failed events should be logged."
)
Imports
from datahub_actions.pipeline.pipeline_config import PipelineConfig
from datahub_actions.pipeline.pipeline_config import (
SourceConfig,
FilterConfig,
TransformConfig,
ActionConfig,
PipelineOptions,
FailureMode,
)
I/O Contract
| Direction | Description |
|---|---|
| Input | A Python dict (typically loaded from YAML) containing the pipeline configuration |
| Output | A validated PipelineConfig instance with typed fields for each pipeline stage
|
| Validation | Pydantic validates types at construction via PipelineConfig.model_validate(config_dict). Required fields (name, source, action) must be present. Optional fields default to None.
|
Field Specifications
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str |
Yes | -- | Pipeline name, used as Kafka consumer group ID |
enabled |
bool |
No | True |
Whether the pipeline is active |
source |
SourceConfig |
Yes | -- | Event source (type + config dict) |
filter |
FilterConfig |
No | None |
Event type and body filter |
transform |
List[TransformConfig] |
No | None |
Ordered list of transforms |
action |
ActionConfig |
Yes | -- | Action plugin (type + config dict) |
datahub |
DatahubClientConfig |
No | None |
DataHub server connection |
options |
PipelineOptions |
No | None |
Retry count, failure mode, failed events dir |
Usage Examples
Minimal YAML Configuration
name: my_slack_notifications
source:
type: kafka
config:
connection:
bootstrap: "localhost:9092"
schema_registry_url: "http://localhost:8081"
action:
type: slack
config:
webhook_url: "${SLACK_WEBHOOK_URL}"
Full YAML Configuration with All Stages
name: tag_propagation_pipeline
enabled: true
source:
type: kafka
config:
connection:
bootstrap: "localhost:9092"
schema_registry_url: "http://localhost:8081"
filter:
event_type: "EntityChangeEvent_v1"
event:
category: "TAG"
operation: "ADD"
action:
type: tag_propagation
datahub:
server: "http://localhost:8080"
token: "${DATAHUB_TOKEN}"
options:
retry_count: 3
failure_mode: "CONTINUE"
failed_events_dir: "/var/log/datahub/actions"
Programmatic Usage
from datahub_actions.pipeline.pipeline_config import PipelineConfig
config_dict = {
"name": "my_pipeline",
"source": {"type": "kafka", "config": {"connection": {...}}},
"action": {"type": "slack", "config": {"webhook_url": "https://..."}},
}
config = PipelineConfig.model_validate(config_dict)
print(config.name) # "my_pipeline"
print(config.enabled) # True
print(config.source.type) # "kafka"
print(config.action.type) # "slack"
print(config.filter) # None
print(config.options) # None
Related
- Implements: Datahub_project_Datahub_Action_Pipeline_Configuration
- Related implementations: Datahub_project_Datahub_FilterTransformer_Transform, Datahub_project_Datahub_Actions_CLI_Run
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment