Implementation:Datahub project Datahub Action Act Interface
Appearance
Metadata
| Field | Value |
|---|---|
| Implementation ID | I-DHACT-004 |
| Title | Action Act Interface |
| Type | API Doc / Pattern Doc |
| Status | Active |
| Last Updated | 2026-02-10 |
| Repository | Datahub_project_Datahub |
| Source Files | datahub-actions/src/datahub_actions/action/action.py (Lines 22-41), datahub-actions/src/datahub_actions/plugin/action/hello_world/hello_world.py (Lines 34-52)
|
| Knowledge Sources | GitHub - datahub-project/datahub, DataHub Documentation |
| Domains | Event_Processing, Automation, Metadata_Management |
Overview
The Action abstract base class defines the interface that all DataHub action plugins must implement. It provides two abstract methods: create() for factory-based instantiation with configuration, and act() for event processing. The class extends Closeable for resource cleanup. This page documents both the abstract interface and the HelloWorldAction reference implementation.
Code Reference
Action Abstract Base Class (action.py, Lines 22-41)
class Action(Closeable, metaclass=ABCMeta):
"""
The base class for all DataHub Actions.
A DataHub action is a component capable of performing a specific action
(notification, auditing, synchronization, & more) when important events
occur on DataHub.
Each Action may provide its own semantics, configurations, compatibility
and guarantees.
"""
@classmethod
@abstractmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
"""Factory method to create an instance of an Action"""
pass
@abstractmethod
def act(self, event: EventEnvelope) -> None:
"""Take Action on DataHub events, provided an instance of a DataHub event."""
pass
HelloWorldAction Reference Implementation (hello_world.py, Lines 27-52)
class HelloWorldConfig(BaseModel):
# Whether to print the message in upper case.
to_upper: Optional[bool] = None
class HelloWorldAction(Action):
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
action_config = HelloWorldConfig.model_validate(config_dict or {})
return cls(action_config, ctx)
def __init__(self, config: HelloWorldConfig, ctx: PipelineContext):
self.config = config
def act(self, event: EventEnvelope) -> None:
print("Hello world! Received event:")
message = json.dumps(json.loads(event.as_json()), indent=4)
if self.config.to_upper:
print(message.upper())
else:
print(message)
def close(self) -> None:
pass
Imports
from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext
I/O Contract
Action.create()
| Direction | Description |
|---|---|
| Input | config_dict: dict -- raw configuration dictionary from YAML; ctx: PipelineContext -- pipeline context with DataHub graph access
|
| Output | An initialized Action instance ready to process events
|
| Raises | Any exception from configuration validation will prevent pipeline startup |
Action.act()
| Direction | Description |
|---|---|
| Input | event: EventEnvelope containing:
|
| Output | None (or optionally bool for selective ack support)
|
| Side Effects | Implementation-defined (send notification, write metadata, call external API, etc.) |
| Error Handling | Exceptions raised from act() are caught by the pipeline and subject to the configured retry count and failure mode
|
Action.close() (from Closeable)
| Direction | Description |
|---|---|
| Input | None |
| Output | None |
| Purpose | Release resources (network connections, file handles, etc.) when the pipeline shuts down |
Built-in Action Implementations
| Plugin Name | Class | Module |
|---|---|---|
executor |
ExecutorAction |
datahub_actions.plugin.action.execution.executor_action
|
slack |
SlackNotificationAction |
datahub_actions.plugin.action.slack.slack
|
teams |
TeamsNotificationAction |
datahub_actions.plugin.action.teams.teams
|
metadata_change_sync |
MetadataChangeSyncAction |
datahub_actions.plugin.action.metadata_change_sync.metadata_change_sync
|
tag_propagation |
TagPropagationAction |
datahub_actions.plugin.action.tag.tag_propagation_action
|
term_propagation |
TermPropagationAction |
datahub_actions.plugin.action.term.term_propagation_action
|
snowflake_tag_propagation |
SnowflakeTagPropagatorAction |
datahub_actions.plugin.action.snowflake.tag_propagator
|
doc_propagation |
DocPropagationAction |
datahub_actions.plugin.action.propagation.docs.propagation_action
|
Usage Examples
Implementing a Custom Action
import logging
from pydantic import BaseModel
from typing import Optional
from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext
logger = logging.getLogger(__name__)
class MyActionConfig(BaseModel):
webhook_url: str
timeout: Optional[int] = 30
class MyCustomAction(Action):
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
config = MyActionConfig.model_validate(config_dict or {})
return cls(config, ctx)
def __init__(self, config: MyActionConfig, ctx: PipelineContext):
self.config = config
self.ctx = ctx
def act(self, event: EventEnvelope) -> None:
logger.info(f"Processing event: {event.event_type}")
# Custom logic here: send webhook, update external system, etc.
def close(self) -> None:
# Clean up resources
pass
Registering a Custom Action via Entry Points
# In setup.py or pyproject.toml:
entry_points = {
"datahub_actions.action.plugins": [
"my_custom = my_package.my_module:MyCustomAction",
],
}
Using the Custom Action in a Pipeline
name: my_custom_pipeline
source:
type: kafka
config:
connection:
bootstrap: "localhost:9092"
schema_registry_url: "http://localhost:8081"
filter:
event_type: "EntityChangeEvent_v1"
action:
type: my_custom
config:
webhook_url: "https://example.com/webhook"
timeout: 60
Related
- Implements: Datahub_project_Datahub_Action_Plugin_Development
- Related implementations: Datahub_project_Datahub_FilterTransformer_Transform, Datahub_project_Datahub_Actions_PipelineConfig
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment