Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Actions PipelineConfig

From Leeroopedia


Metadata

Field Value
Implementation ID I-DHACT-002
Title Actions PipelineConfig
Type API Doc
Status Active
Last Updated 2026-02-10
Repository Datahub_project_Datahub
Source File datahub-actions/src/datahub_actions/pipeline/pipeline_config.py (Lines 59-74)
Knowledge Sources GitHub - datahub-project/datahub, DataHub Documentation
Domains Event_Processing, Automation, Metadata_Management

Overview

The PipelineConfig Pydantic model defines the complete configuration schema for a DataHub Actions pipeline. It maps directly to the YAML configuration file structure, with supporting models for each pipeline stage: SourceConfig, FilterConfig, TransformConfig, ActionConfig, and PipelineOptions.

Code Reference

PipelineConfig Class (pipeline_config.py, Lines 59-74)

class PipelineConfig(ConfigModel):
    """
    Configuration required to create a new Actions Pipeline.

    This exactly matches the structure of the YAML file used
    to configure a Pipeline.
    """

    name: str
    enabled: bool = True
    source: SourceConfig
    filter: Optional[FilterConfig] = None
    transform: Optional[List[TransformConfig]] = None
    action: ActionConfig
    datahub: Optional[DatahubClientConfig] = None
    options: Optional[PipelineOptions] = None

Supporting Configuration Models (Lines 24-57)

class FailureMode(ConfigEnum):
    THROW = "THROW"      # Stop pipeline on failure
    CONTINUE = "CONTINUE" # Log failure and continue processing

class SourceConfig(ConfigModel):
    type: str
    config: Optional[Dict[str, Any]] = Field(default=None)

class TransformConfig(ConfigModel):
    type: str
    config: Optional[Dict[str, Any]] = Field(default=None)

class FilterConfig(ConfigModel):
    event_type: Union[str, List[str]]
    event: Optional[Dict[str, Any]] = Field(default=None)

class ActionConfig(ConfigModel):
    type: str
    config: Optional[Dict[str, Any]] = Field(default=None)

class PipelineOptions(BaseModel):
    retry_count: Optional[int] = Field(default=None)
    failure_mode: Optional[FailureMode] = Field(default=None)
    failed_events_dir: Optional[str] = Field(
        default=None, description="The path where failed events should be logged."
    )

Imports

from datahub_actions.pipeline.pipeline_config import PipelineConfig
from datahub_actions.pipeline.pipeline_config import (
    SourceConfig,
    FilterConfig,
    TransformConfig,
    ActionConfig,
    PipelineOptions,
    FailureMode,
)

I/O Contract

Direction Description
Input A Python dict (typically loaded from YAML) containing the pipeline configuration
Output A validated PipelineConfig instance with typed fields for each pipeline stage
Validation Pydantic validates types at construction via PipelineConfig.model_validate(config_dict). Required fields (name, source, action) must be present. Optional fields default to None.

Field Specifications

Field Type Required Default Description
name str Yes -- Pipeline name, used as Kafka consumer group ID
enabled bool No True Whether the pipeline is active
source SourceConfig Yes -- Event source (type + config dict)
filter FilterConfig No None Event type and body filter
transform List[TransformConfig] No None Ordered list of transforms
action ActionConfig Yes -- Action plugin (type + config dict)
datahub DatahubClientConfig No None DataHub server connection
options PipelineOptions No None Retry count, failure mode, failed events dir

Usage Examples

Minimal YAML Configuration

name: my_slack_notifications
source:
  type: kafka
  config:
    connection:
      bootstrap: "localhost:9092"
      schema_registry_url: "http://localhost:8081"
action:
  type: slack
  config:
    webhook_url: "${SLACK_WEBHOOK_URL}"

Full YAML Configuration with All Stages

name: tag_propagation_pipeline
enabled: true
source:
  type: kafka
  config:
    connection:
      bootstrap: "localhost:9092"
      schema_registry_url: "http://localhost:8081"
filter:
  event_type: "EntityChangeEvent_v1"
  event:
    category: "TAG"
    operation: "ADD"
action:
  type: tag_propagation
datahub:
  server: "http://localhost:8080"
  token: "${DATAHUB_TOKEN}"
options:
  retry_count: 3
  failure_mode: "CONTINUE"
  failed_events_dir: "/var/log/datahub/actions"

Programmatic Usage

from datahub_actions.pipeline.pipeline_config import PipelineConfig

config_dict = {
    "name": "my_pipeline",
    "source": {"type": "kafka", "config": {"connection": {...}}},
    "action": {"type": "slack", "config": {"webhook_url": "https://..."}},
}

config = PipelineConfig.model_validate(config_dict)
print(config.name)           # "my_pipeline"
print(config.enabled)        # True
print(config.source.type)    # "kafka"
print(config.action.type)    # "slack"
print(config.filter)         # None
print(config.options)        # None

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment