Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Actions CLI Run

From Leeroopedia


Metadata

Field Value
Implementation ID I-DHACT-005
Title Actions CLI Run
Type API Doc
Status Active
Last Updated 2026-02-10
Repository Datahub_project_Datahub
Source Files datahub-actions/src/datahub_actions/cli/actions.py (Lines 93-183), datahub-actions/src/datahub_actions/pipeline/pipeline.py (Lines 112-176), datahub-actions/src/datahub_actions/pipeline/pipeline_manager.py (Lines 51-105)
Knowledge Sources GitHub - datahub-project/datahub, DataHub Documentation
Domains Event_Processing, Automation, Metadata_Management

Overview

The datahub-actions CLI run command loads one or more YAML pipeline configuration files, creates Pipeline instances, and starts them as daemon threads via the PipelineManager. Each pipeline runs a blocking event loop consuming from Kafka topics (MetadataChangeLog_Versioned_v1, MetadataChangeLog_Timeseries_v1, PlatformEvent_v1) and processing events through the configured source, filter, transform, and action stages.

Code Reference

CLI Run Command (actions.py, Lines 93-183)

@actions.command(
    name="run",
    context_settings=dict(
        ignore_unknown_options=True,
        allow_extra_args=True,
    ),
)
@click.option("-c", "--config", required=True, type=str, multiple=True)
@click.option("--debug/--no-debug", default=False)
@click.pass_context
def run(ctx: Any, config: List[str], debug: bool) -> None:
    """Execute one or more Actions Pipelines"""

    logger.info("DataHub Actions version: %s", actions_version.nice_version_name())

    if debug:
        logging.getLogger().setLevel(logging.DEBUG)
    else:
        logging.getLogger().setLevel(logging.INFO)

    pipelines: List[Pipeline] = []
    logger.debug("Creating Actions Pipelines...")

    # Phase 1: Initial validation of configs
    valid_configs = []
    for pipeline_config in config:
        pipeline_config_file = pathlib.Path(pipeline_config)
        try:
            raw_config = load_raw_config_file(pipeline_config_file)
            if not is_pipeline_enabled(raw_config):
                logger.warning(
                    f"Skipping pipeline {raw_config.get('name') or pipeline_config} "
                    f"as it is not enabled"
                )
                continue
            valid_configs.append(pipeline_config_file)
        except Exception as e:
            if len(config) == 1:
                raise Exception(
                    f"Failed to load raw configuration file {pipeline_config_file}"
                ) from e
            logger.warning(
                f"Failed to load pipeline configuration! "
                f"Skipping action config file {pipeline_config_file}...: {e}"
            )

    # Phase 2: Full config loading and pipeline creation
    for pipeline_config_file in valid_configs:
        try:
            pipeline_config_dict = load_config_file(pipeline_config_file)
            if not is_pipeline_enabled(pipeline_config_dict):
                continue
            pipelines.append(pipeline_config_to_pipeline(pipeline_config_dict))
        except UnboundVariable as e:
            if len(valid_configs) == 1:
                raise Exception(
                    "Failed to load action configuration. "
                    "Unbound variable(s) provided in config YAML."
                ) from e
            logger.warning(
                f"Failed to resolve variables in config file "
                f"{pipeline_config_file}...: {e}"
            )
            continue

    if not pipelines:
        logger.error(
            f"No valid pipelines were started from {len(config)} config(s). "
            "Check that at least one pipeline is enabled and all required "
            "environment variables are set."
        )
        sys.exit(1)

    # Start each pipeline
    for p in pipelines:
        pipeline_manager.start_pipeline(p.name, p)
        logger.info(f"Action Pipeline with name '{p.name}' is now running.")

    # Run forever
    while True:
        time.sleep(5)

Pipeline.create() and Pipeline.run() (pipeline.py, Lines 112-176)

@classmethod
def create(cls, config_dict: dict) -> "Pipeline":
    # Bind config
    config = PipelineConfig.model_validate(config_dict)

    if not config.enabled:
        raise Exception(
            "Pipeline is disabled, but create method was called unexpectedly."
        )

    # Create Context
    ctx = create_action_context(config.name, config.datahub)

    # Create Event Source
    event_source = create_event_source(config.source, ctx)

    # Create Transforms
    transforms = []
    if config.filter is not None:
        transforms.append(create_filter_transformer(config.filter, ctx))
    if config.transform is not None:
        for transform_config in config.transform:
            transforms.append(create_transformer(transform_config, ctx))

    # Create Action
    action = create_action(config.action, ctx)

    # Finally, create Pipeline.
    return cls(
        config.name,
        event_source,
        transforms,
        action,
        config.options.retry_count if config.options else None,
        config.options.failure_mode if config.options else None,
        config.options.failed_events_dir if config.options else None,
    )

def run(self) -> None:
    """
    Run the action pipeline synchronously. This method is blocking.
    Raises an instance of PipelineException if an unrecoverable
    pipeline failure occurs.
    """
    self._stats.mark_start()

    # First, source the events.
    enveloped_events = self.source.events()
    for enveloped_event in enveloped_events:
        # Then, process the event.
        retval = self._process_event(enveloped_event)

        # For legacy users w/o selective ack support, convert
        # None to True, i.e. always commit.
        if retval is None:
            retval = True

        # Finally, ack the event.
        self._ack_event(enveloped_event, retval)

PipelineManager (pipeline_manager.py, Lines 51-105)

class PipelineManager:
    # A catalog of all the currently executing Action Pipelines.
    pipeline_registry: Dict[str, PipelineSpec] = {}

    def __init__(self) -> None:
        pass

    def start_pipeline(self, name: str, pipeline: Pipeline) -> None:
        logger.debug(f"Attempting to start pipeline with name {name}...")
        if name not in self.pipeline_registry:
            thread = Thread(target=run_pipeline, args=([pipeline]))
            thread.start()
            spec = PipelineSpec(name, pipeline, thread)
            self.pipeline_registry[name] = spec
            logger.debug(f"Started pipeline with name {name}.")
        else:
            raise Exception(f"Pipeline with name {name} is already running.")

    def stop_pipeline(self, name: str) -> None:
        logger.debug(f"Attempting to stop pipeline with name {name}...")
        if name in self.pipeline_registry:
            try:
                pipeline_spec = self.pipeline_registry[name]
                pipeline_spec.pipeline.stop()
                pipeline_spec.thread.join()
                logger.info(f"Actions Pipeline with name '{name}' has been stopped.")
                pipeline_spec.pipeline.stats().pretty_print_summary(name)
                del self.pipeline_registry[name]
            except Exception as e:
                logger.error(
                    f"Caught exception while attempting to stop pipeline "
                    f"with name {name}: {traceback.format_exc(limit=3)}"
                )
                raise Exception(
                    f"Caught exception while attempting to stop pipeline "
                    f"with name {name}."
                ) from e
        else:
            raise Exception(f"No pipeline with name {name} found.")

    def stop_all(self) -> None:
        logger.debug("Attempting to stop all running pipelines...")
        names = list(self.pipeline_registry.keys()).copy()
        for name in names:
            self.stop_pipeline(name)
        logger.debug("Successfully stop all running pipelines.")

I/O Contract

CLI Interface

Parameter Type Required Description
-c / --config str (multiple) Yes Path(s) to YAML pipeline configuration files
--debug / --no-debug bool No Enable debug logging (default: false)

Kafka Topics Consumed

Route Key Default Topic Name Content
mcl MetadataChangeLog_Versioned_v1 Versioned aspect changes
mcl_timeseries MetadataChangeLog_Timeseries_v1 Timeseries aspect changes
pe PlatformEvent_v1 Platform events (EntityChangeEvent, etc.)

Process Behavior

Aspect Behavior
Blocking Main thread sleeps in infinite loop (while True: time.sleep(5))
Threading One daemon thread per pipeline via PipelineManager.start_pipeline()
Shutdown SIGINT handler calls PipelineManager.stop_all() then sys.exit(1)
Failed events Written to /tmp/logs/datahub/actions/<pipeline_name>/failed_events.log (configurable)
Error on single config Raises exception immediately if the only config fails to load
Error on multiple configs Logs warning and skips failed configs, exits if no valid pipelines remain

Usage Examples

Single Pipeline

datahub-actions -c /path/to/slack_notifications.yml

Multiple Pipelines

datahub-actions -c /path/to/slack.yml -c /path/to/tag_propagation.yml -c /path/to/term_propagation.yml

Debug Mode

datahub-actions -c /path/to/pipeline.yml --debug

With Monitoring Enabled

datahub-actions --enable-monitoring --monitoring-port 9090 actions run -c /path/to/pipeline.yml

Example Pipeline Configuration File

name: slack_notifications
enabled: true
source:
  type: kafka
  config:
    connection:
      bootstrap: "${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}"
      schema_registry_url: "${SCHEMA_REGISTRY_URL:-http://localhost:8081}"
filter:
  event_type: "EntityChangeEvent_v1"
  event:
    category: "TAG"
    operation: "ADD"
action:
  type: slack
  config:
    webhook_url: "${SLACK_WEBHOOK_URL}"
datahub:
  server: "${DATAHUB_GMS_URL:-http://localhost:8080}"
  token: "${DATAHUB_TOKEN}"
options:
  retry_count: 3
  failure_mode: "CONTINUE"
  failed_events_dir: "/var/log/datahub/actions"

Programmatic Pipeline Creation

from datahub_actions.pipeline.pipeline import Pipeline
from datahub_actions.pipeline.pipeline_manager import PipelineManager

# Create pipeline from config dict
config_dict = {
    "name": "my_pipeline",
    "source": {"type": "kafka", "config": {...}},
    "action": {"type": "slack", "config": {...}},
}
pipeline = Pipeline.create(config_dict)

# Start via manager
manager = PipelineManager()
manager.start_pipeline("my_pipeline", pipeline)

# Later, stop
manager.stop_all()

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment