Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub FilterTransformer Transform

From Leeroopedia


Metadata

Field Value
Implementation ID I-DHACT-003
Title FilterTransformer Transform
Type API Doc
Status Active
Last Updated 2026-02-10
Repository Datahub_project_Datahub
Source File datahub-actions/src/datahub_actions/plugin/transform/filter/filter_transformer.py (Lines 34-86)
Knowledge Sources GitHub - datahub-project/datahub, DataHub Documentation
Domains Event_Processing, Automation, Metadata_Management

Overview

The FilterTransformer class implements the Transformer abstract base class to provide event filtering based on event type and optional JSON body matching. It is the built-in mechanism for the filter stage in action pipeline configurations, converting filter criteria into a transformer that either passes events through or drops them by returning None.

Code Reference

FilterTransformer Class (filter_transformer.py, Lines 34-86)

class FilterTransformer(Transformer):
    def __init__(self, config: FilterTransformerConfig):
        self.config: FilterTransformerConfig = config

    @classmethod
    def create(cls, config_dict: dict, ctx: PipelineContext) -> "Transformer":
        config = FilterTransformerConfig.model_validate(config_dict)
        return cls(config)

    def transform(self, env_event: EventEnvelope) -> Optional[EventEnvelope]:
        logger.debug(f"Preparing to filter event {env_event}")

        # Match Event Type.
        if not self._matches(self.config.event_type, env_event.event_type):
            return None

        # Match Event Body.
        if self.config.event is not None:
            body_as_json_dict = json.loads(env_event.event.as_json())
            for key, val in self.config.event.items():
                if not self._matches(val, body_as_json_dict.get(key)):
                    return None
        return env_event

FilterTransformerConfig (filter_transformer.py, Lines 29-31)

class FilterTransformerConfig(ConfigModel):
    event_type: Union[str, List[str]]
    event: Optional[Dict[str, Any]] = Field(default=None)

Matching Logic (filter_transformer.py, Lines 58-86)

def _matches(self, match_val: Any, match_val_to: Any) -> bool:
    if isinstance(match_val, dict):
        return self._matches_dict(match_val, match_val_to)
    if isinstance(match_val, list):
        return self._matches_list(match_val, match_val_to)
    return match_val == match_val_to

def _matches_list(self, match_filters: List, match_with: Any) -> bool:
    """When matching lists we do ANY not ALL match"""
    if not isinstance(match_with, str):
        return False
    for filter in match_filters:
        if filter == match_with:
            return True
    return False

def _matches_dict(self, match_filters: Dict[str, Any], match_with: Any) -> bool:
    if isinstance(match_with, str):
        try:
            match_with = json.loads(match_with)
        except ValueError:
            pass
    if not isinstance(match_with, dict):
        return False
    for key, val in match_filters.items():
        curr = match_with.get(key)
        if not self._matches(val, curr):
            return False
    return True

Import

from datahub_actions.plugin.transform.filter.filter_transformer import FilterTransformer
from datahub_actions.plugin.transform.filter.filter_transformer import FilterTransformerConfig

I/O Contract

Direction Description
Input EventEnvelope containing event_type (str) and event (Event with as_json())
Output Optional[EventEnvelope] -- returns the original event if it matches all filter criteria, or None if the event should be filtered out
Configuration FilterTransformerConfig with event_type: Union[str, List[str]] and event: Optional[Dict[str, Any]]

Matching Semantics

Match Type Semantics Behavior
Scalar Equality match_val == match_val_to
List ANY (disjunction) Returns True if any list element equals the target string
Dict ALL (conjunction, recursive) Returns True if all specified keys match recursively. JSON strings in the target are auto-parsed.

Usage Examples

Filter by Event Type Only

filter:
  event_type: "EntityChangeEvent_v1"

Filter by Multiple Event Types

filter:
  event_type:
    - "EntityChangeEvent_v1"
    - "MetadataChangeLogEvent_v1"

Filter by Event Type and Body Content

filter:
  event_type: "EntityChangeEvent_v1"
  event:
    category: "TAG"
    operation: "ADD"
    entityType: "dataset"

Programmatic Usage

from datahub_actions.plugin.transform.filter.filter_transformer import (
    FilterTransformer,
    FilterTransformerConfig,
)
from datahub_actions.event.event_envelope import EventEnvelope

config = FilterTransformerConfig(
    event_type="EntityChangeEvent_v1",
    event={"category": "TAG", "operation": "ADD"},
)
transformer = FilterTransformer(config)

# Returns env_event if it matches, None otherwise
result = transformer.transform(env_event)
if result is not None:
    print("Event passed filter")
else:
    print("Event filtered out")

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment