Implementation:Datahub project Datahub FilterTransformer Transform
Appearance
Metadata
| Field | Value |
|---|---|
| Implementation ID | I-DHACT-003 |
| Title | FilterTransformer Transform |
| Type | API Doc |
| Status | Active |
| Last Updated | 2026-02-10 |
| Repository | Datahub_project_Datahub |
| Source File | datahub-actions/src/datahub_actions/plugin/transform/filter/filter_transformer.py (Lines 34-86)
|
| Knowledge Sources | GitHub - datahub-project/datahub, DataHub Documentation |
| Domains | Event_Processing, Automation, Metadata_Management |
Overview
The FilterTransformer class implements the Transformer abstract base class to provide event filtering based on event type and optional JSON body matching. It is the built-in mechanism for the filter stage in action pipeline configurations, converting filter criteria into a transformer that either passes events through or drops them by returning None.
Code Reference
FilterTransformer Class (filter_transformer.py, Lines 34-86)
class FilterTransformer(Transformer):
def __init__(self, config: FilterTransformerConfig):
self.config: FilterTransformerConfig = config
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Transformer":
config = FilterTransformerConfig.model_validate(config_dict)
return cls(config)
def transform(self, env_event: EventEnvelope) -> Optional[EventEnvelope]:
logger.debug(f"Preparing to filter event {env_event}")
# Match Event Type.
if not self._matches(self.config.event_type, env_event.event_type):
return None
# Match Event Body.
if self.config.event is not None:
body_as_json_dict = json.loads(env_event.event.as_json())
for key, val in self.config.event.items():
if not self._matches(val, body_as_json_dict.get(key)):
return None
return env_event
FilterTransformerConfig (filter_transformer.py, Lines 29-31)
class FilterTransformerConfig(ConfigModel):
event_type: Union[str, List[str]]
event: Optional[Dict[str, Any]] = Field(default=None)
Matching Logic (filter_transformer.py, Lines 58-86)
def _matches(self, match_val: Any, match_val_to: Any) -> bool:
if isinstance(match_val, dict):
return self._matches_dict(match_val, match_val_to)
if isinstance(match_val, list):
return self._matches_list(match_val, match_val_to)
return match_val == match_val_to
def _matches_list(self, match_filters: List, match_with: Any) -> bool:
"""When matching lists we do ANY not ALL match"""
if not isinstance(match_with, str):
return False
for filter in match_filters:
if filter == match_with:
return True
return False
def _matches_dict(self, match_filters: Dict[str, Any], match_with: Any) -> bool:
if isinstance(match_with, str):
try:
match_with = json.loads(match_with)
except ValueError:
pass
if not isinstance(match_with, dict):
return False
for key, val in match_filters.items():
curr = match_with.get(key)
if not self._matches(val, curr):
return False
return True
Import
from datahub_actions.plugin.transform.filter.filter_transformer import FilterTransformer
from datahub_actions.plugin.transform.filter.filter_transformer import FilterTransformerConfig
I/O Contract
| Direction | Description |
|---|---|
| Input | EventEnvelope containing event_type (str) and event (Event with as_json())
|
| Output | Optional[EventEnvelope] -- returns the original event if it matches all filter criteria, or None if the event should be filtered out
|
| Configuration | FilterTransformerConfig with event_type: Union[str, List[str]] and event: Optional[Dict[str, Any]]
|
Matching Semantics
| Match Type | Semantics | Behavior |
|---|---|---|
| Scalar | Equality | match_val == match_val_to
|
| List | ANY (disjunction) | Returns True if any list element equals the target string
|
| Dict | ALL (conjunction, recursive) | Returns True if all specified keys match recursively. JSON strings in the target are auto-parsed.
|
Usage Examples
Filter by Event Type Only
filter:
event_type: "EntityChangeEvent_v1"
Filter by Multiple Event Types
filter:
event_type:
- "EntityChangeEvent_v1"
- "MetadataChangeLogEvent_v1"
Filter by Event Type and Body Content
filter:
event_type: "EntityChangeEvent_v1"
event:
category: "TAG"
operation: "ADD"
entityType: "dataset"
Programmatic Usage
from datahub_actions.plugin.transform.filter.filter_transformer import (
FilterTransformer,
FilterTransformerConfig,
)
from datahub_actions.event.event_envelope import EventEnvelope
config = FilterTransformerConfig(
event_type="EntityChangeEvent_v1",
event={"category": "TAG", "operation": "ADD"},
)
transformer = FilterTransformer(config)
# Returns env_event if it matches, None otherwise
result = transformer.transform(env_event)
if result is not None:
print("Event passed filter")
else:
print("Event filtered out")
Related
- Implements: Datahub_project_Datahub_Event_Filtering
- Related implementations: Datahub_project_Datahub_Actions_PipelineConfig, Datahub_project_Datahub_Action_Act_Interface
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment