Implementation:Datahub project Datahub Actions CLI Run
Appearance
Metadata
| Field | Value |
|---|---|
| Implementation ID | I-DHACT-005 |
| Title | Actions CLI Run |
| Type | API Doc |
| Status | Active |
| Last Updated | 2026-02-10 |
| Repository | Datahub_project_Datahub |
| Source Files | datahub-actions/src/datahub_actions/cli/actions.py (Lines 93-183), datahub-actions/src/datahub_actions/pipeline/pipeline.py (Lines 112-176), datahub-actions/src/datahub_actions/pipeline/pipeline_manager.py (Lines 51-105)
|
| Knowledge Sources | GitHub - datahub-project/datahub, DataHub Documentation |
| Domains | Event_Processing, Automation, Metadata_Management |
Overview
The datahub-actions CLI run command loads one or more YAML pipeline configuration files, creates Pipeline instances, and starts them as daemon threads via the PipelineManager. Each pipeline runs a blocking event loop consuming from Kafka topics (MetadataChangeLog_Versioned_v1, MetadataChangeLog_Timeseries_v1, PlatformEvent_v1) and processing events through the configured source, filter, transform, and action stages.
Code Reference
CLI Run Command (actions.py, Lines 93-183)
@actions.command(
name="run",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.option("-c", "--config", required=True, type=str, multiple=True)
@click.option("--debug/--no-debug", default=False)
@click.pass_context
def run(ctx: Any, config: List[str], debug: bool) -> None:
"""Execute one or more Actions Pipelines"""
logger.info("DataHub Actions version: %s", actions_version.nice_version_name())
if debug:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
pipelines: List[Pipeline] = []
logger.debug("Creating Actions Pipelines...")
# Phase 1: Initial validation of configs
valid_configs = []
for pipeline_config in config:
pipeline_config_file = pathlib.Path(pipeline_config)
try:
raw_config = load_raw_config_file(pipeline_config_file)
if not is_pipeline_enabled(raw_config):
logger.warning(
f"Skipping pipeline {raw_config.get('name') or pipeline_config} "
f"as it is not enabled"
)
continue
valid_configs.append(pipeline_config_file)
except Exception as e:
if len(config) == 1:
raise Exception(
f"Failed to load raw configuration file {pipeline_config_file}"
) from e
logger.warning(
f"Failed to load pipeline configuration! "
f"Skipping action config file {pipeline_config_file}...: {e}"
)
# Phase 2: Full config loading and pipeline creation
for pipeline_config_file in valid_configs:
try:
pipeline_config_dict = load_config_file(pipeline_config_file)
if not is_pipeline_enabled(pipeline_config_dict):
continue
pipelines.append(pipeline_config_to_pipeline(pipeline_config_dict))
except UnboundVariable as e:
if len(valid_configs) == 1:
raise Exception(
"Failed to load action configuration. "
"Unbound variable(s) provided in config YAML."
) from e
logger.warning(
f"Failed to resolve variables in config file "
f"{pipeline_config_file}...: {e}"
)
continue
if not pipelines:
logger.error(
f"No valid pipelines were started from {len(config)} config(s). "
"Check that at least one pipeline is enabled and all required "
"environment variables are set."
)
sys.exit(1)
# Start each pipeline
for p in pipelines:
pipeline_manager.start_pipeline(p.name, p)
logger.info(f"Action Pipeline with name '{p.name}' is now running.")
# Run forever
while True:
time.sleep(5)
Pipeline.create() and Pipeline.run() (pipeline.py, Lines 112-176)
@classmethod
def create(cls, config_dict: dict) -> "Pipeline":
# Bind config
config = PipelineConfig.model_validate(config_dict)
if not config.enabled:
raise Exception(
"Pipeline is disabled, but create method was called unexpectedly."
)
# Create Context
ctx = create_action_context(config.name, config.datahub)
# Create Event Source
event_source = create_event_source(config.source, ctx)
# Create Transforms
transforms = []
if config.filter is not None:
transforms.append(create_filter_transformer(config.filter, ctx))
if config.transform is not None:
for transform_config in config.transform:
transforms.append(create_transformer(transform_config, ctx))
# Create Action
action = create_action(config.action, ctx)
# Finally, create Pipeline.
return cls(
config.name,
event_source,
transforms,
action,
config.options.retry_count if config.options else None,
config.options.failure_mode if config.options else None,
config.options.failed_events_dir if config.options else None,
)
def run(self) -> None:
"""
Run the action pipeline synchronously. This method is blocking.
Raises an instance of PipelineException if an unrecoverable
pipeline failure occurs.
"""
self._stats.mark_start()
# First, source the events.
enveloped_events = self.source.events()
for enveloped_event in enveloped_events:
# Then, process the event.
retval = self._process_event(enveloped_event)
# For legacy users w/o selective ack support, convert
# None to True, i.e. always commit.
if retval is None:
retval = True
# Finally, ack the event.
self._ack_event(enveloped_event, retval)
PipelineManager (pipeline_manager.py, Lines 51-105)
class PipelineManager:
# A catalog of all the currently executing Action Pipelines.
pipeline_registry: Dict[str, PipelineSpec] = {}
def __init__(self) -> None:
pass
def start_pipeline(self, name: str, pipeline: Pipeline) -> None:
logger.debug(f"Attempting to start pipeline with name {name}...")
if name not in self.pipeline_registry:
thread = Thread(target=run_pipeline, args=([pipeline]))
thread.start()
spec = PipelineSpec(name, pipeline, thread)
self.pipeline_registry[name] = spec
logger.debug(f"Started pipeline with name {name}.")
else:
raise Exception(f"Pipeline with name {name} is already running.")
def stop_pipeline(self, name: str) -> None:
logger.debug(f"Attempting to stop pipeline with name {name}...")
if name in self.pipeline_registry:
try:
pipeline_spec = self.pipeline_registry[name]
pipeline_spec.pipeline.stop()
pipeline_spec.thread.join()
logger.info(f"Actions Pipeline with name '{name}' has been stopped.")
pipeline_spec.pipeline.stats().pretty_print_summary(name)
del self.pipeline_registry[name]
except Exception as e:
logger.error(
f"Caught exception while attempting to stop pipeline "
f"with name {name}: {traceback.format_exc(limit=3)}"
)
raise Exception(
f"Caught exception while attempting to stop pipeline "
f"with name {name}."
) from e
else:
raise Exception(f"No pipeline with name {name} found.")
def stop_all(self) -> None:
logger.debug("Attempting to stop all running pipelines...")
names = list(self.pipeline_registry.keys()).copy()
for name in names:
self.stop_pipeline(name)
logger.debug("Successfully stop all running pipelines.")
I/O Contract
CLI Interface
| Parameter | Type | Required | Description |
|---|---|---|---|
-c / --config |
str (multiple) |
Yes | Path(s) to YAML pipeline configuration files |
--debug / --no-debug |
bool |
No | Enable debug logging (default: false)
|
Kafka Topics Consumed
| Route Key | Default Topic Name | Content |
|---|---|---|
mcl |
MetadataChangeLog_Versioned_v1 |
Versioned aspect changes |
mcl_timeseries |
MetadataChangeLog_Timeseries_v1 |
Timeseries aspect changes |
pe |
PlatformEvent_v1 |
Platform events (EntityChangeEvent, etc.) |
Process Behavior
| Aspect | Behavior |
|---|---|
| Blocking | Main thread sleeps in infinite loop (while True: time.sleep(5))
|
| Threading | One daemon thread per pipeline via PipelineManager.start_pipeline()
|
| Shutdown | SIGINT handler calls PipelineManager.stop_all() then sys.exit(1)
|
| Failed events | Written to /tmp/logs/datahub/actions/<pipeline_name>/failed_events.log (configurable)
|
| Error on single config | Raises exception immediately if the only config fails to load |
| Error on multiple configs | Logs warning and skips failed configs, exits if no valid pipelines remain |
Usage Examples
Single Pipeline
datahub-actions -c /path/to/slack_notifications.yml
Multiple Pipelines
datahub-actions -c /path/to/slack.yml -c /path/to/tag_propagation.yml -c /path/to/term_propagation.yml
Debug Mode
datahub-actions -c /path/to/pipeline.yml --debug
With Monitoring Enabled
datahub-actions --enable-monitoring --monitoring-port 9090 actions run -c /path/to/pipeline.yml
Example Pipeline Configuration File
name: slack_notifications
enabled: true
source:
type: kafka
config:
connection:
bootstrap: "${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}"
schema_registry_url: "${SCHEMA_REGISTRY_URL:-http://localhost:8081}"
filter:
event_type: "EntityChangeEvent_v1"
event:
category: "TAG"
operation: "ADD"
action:
type: slack
config:
webhook_url: "${SLACK_WEBHOOK_URL}"
datahub:
server: "${DATAHUB_GMS_URL:-http://localhost:8080}"
token: "${DATAHUB_TOKEN}"
options:
retry_count: 3
failure_mode: "CONTINUE"
failed_events_dir: "/var/log/datahub/actions"
Programmatic Pipeline Creation
from datahub_actions.pipeline.pipeline import Pipeline
from datahub_actions.pipeline.pipeline_manager import PipelineManager
# Create pipeline from config dict
config_dict = {
"name": "my_pipeline",
"source": {"type": "kafka", "config": {...}},
"action": {"type": "slack", "config": {...}},
}
pipeline = Pipeline.create(config_dict)
# Start via manager
manager = PipelineManager()
manager.start_pipeline("my_pipeline", pipeline)
# Later, stop
manager.stop_all()
Related
- Implements: Datahub_project_Datahub_Actions_Deployment
- Related implementations: Datahub_project_Datahub_Actions_PipelineConfig, Datahub_project_Datahub_Pip_Install_Datahub_Actions
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Batch_Size_And_Timeout_Tuning
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment