Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Action Act Interface

From Leeroopedia


Metadata

Field Value
Implementation ID I-DHACT-004
Title Action Act Interface
Type API Doc / Pattern Doc
Status Active
Last Updated 2026-02-10
Repository Datahub_project_Datahub
Source Files datahub-actions/src/datahub_actions/action/action.py (Lines 22-41), datahub-actions/src/datahub_actions/plugin/action/hello_world/hello_world.py (Lines 34-52)
Knowledge Sources GitHub - datahub-project/datahub, DataHub Documentation
Domains Event_Processing, Automation, Metadata_Management

Overview

The Action abstract base class defines the interface that all DataHub action plugins must implement. It provides two abstract methods: create() for factory-based instantiation with configuration, and act() for event processing. The class extends Closeable for resource cleanup. This page documents both the abstract interface and the HelloWorldAction reference implementation.

Code Reference

Action Abstract Base Class (action.py, Lines 22-41)

class Action(Closeable, metaclass=ABCMeta):
    """
    The base class for all DataHub Actions.

    A DataHub action is a component capable of performing a specific action
    (notification, auditing, synchronization, & more) when important events
    occur on DataHub.

    Each Action may provide its own semantics, configurations, compatibility
    and guarantees.
    """

    @classmethod
    @abstractmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
        """Factory method to create an instance of an Action"""
        pass

    @abstractmethod
    def act(self, event: EventEnvelope) -> None:
        """Take Action on DataHub events, provided an instance of a DataHub event."""
        pass

HelloWorldAction Reference Implementation (hello_world.py, Lines 27-52)

class HelloWorldConfig(BaseModel):
    # Whether to print the message in upper case.
    to_upper: Optional[bool] = None


class HelloWorldAction(Action):
    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
        action_config = HelloWorldConfig.model_validate(config_dict or {})
        return cls(action_config, ctx)

    def __init__(self, config: HelloWorldConfig, ctx: PipelineContext):
        self.config = config

    def act(self, event: EventEnvelope) -> None:
        print("Hello world! Received event:")
        message = json.dumps(json.loads(event.as_json()), indent=4)
        if self.config.to_upper:
            print(message.upper())
        else:
            print(message)

    def close(self) -> None:
        pass

Imports

from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext

I/O Contract

Action.create()

Direction Description
Input config_dict: dict -- raw configuration dictionary from YAML; ctx: PipelineContext -- pipeline context with DataHub graph access
Output An initialized Action instance ready to process events
Raises Any exception from configuration validation will prevent pipeline startup

Action.act()

Direction Description
Input event: EventEnvelope containing:
  • event_type: str -- the type identifier (e.g., "EntityChangeEvent_v1")
  • event: Event -- the deserialized event payload with as_json() method
  • meta: Dict[str, Any] -- arbitrary metadata (e.g., Kafka offset info)
Output None (or optionally bool for selective ack support)
Side Effects Implementation-defined (send notification, write metadata, call external API, etc.)
Error Handling Exceptions raised from act() are caught by the pipeline and subject to the configured retry count and failure mode

Action.close() (from Closeable)

Direction Description
Input None
Output None
Purpose Release resources (network connections, file handles, etc.) when the pipeline shuts down

Built-in Action Implementations

Plugin Name Class Module
executor ExecutorAction datahub_actions.plugin.action.execution.executor_action
slack SlackNotificationAction datahub_actions.plugin.action.slack.slack
teams TeamsNotificationAction datahub_actions.plugin.action.teams.teams
metadata_change_sync MetadataChangeSyncAction datahub_actions.plugin.action.metadata_change_sync.metadata_change_sync
tag_propagation TagPropagationAction datahub_actions.plugin.action.tag.tag_propagation_action
term_propagation TermPropagationAction datahub_actions.plugin.action.term.term_propagation_action
snowflake_tag_propagation SnowflakeTagPropagatorAction datahub_actions.plugin.action.snowflake.tag_propagator
doc_propagation DocPropagationAction datahub_actions.plugin.action.propagation.docs.propagation_action

Usage Examples

Implementing a Custom Action

import logging
from pydantic import BaseModel
from typing import Optional

from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext

logger = logging.getLogger(__name__)


class MyActionConfig(BaseModel):
    webhook_url: str
    timeout: Optional[int] = 30


class MyCustomAction(Action):
    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
        config = MyActionConfig.model_validate(config_dict or {})
        return cls(config, ctx)

    def __init__(self, config: MyActionConfig, ctx: PipelineContext):
        self.config = config
        self.ctx = ctx

    def act(self, event: EventEnvelope) -> None:
        logger.info(f"Processing event: {event.event_type}")
        # Custom logic here: send webhook, update external system, etc.

    def close(self) -> None:
        # Clean up resources
        pass

Registering a Custom Action via Entry Points

# In setup.py or pyproject.toml:
entry_points = {
    "datahub_actions.action.plugins": [
        "my_custom = my_package.my_module:MyCustomAction",
    ],
}

Using the Custom Action in a Pipeline

name: my_custom_pipeline
source:
  type: kafka
  config:
    connection:
      bootstrap: "localhost:9092"
      schema_registry_url: "http://localhost:8081"
filter:
  event_type: "EntityChangeEvent_v1"
action:
  type: my_custom
  config:
    webhook_url: "https://example.com/webhook"
    timeout: 60

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment