Implementation:Datahub project Datahub MetadataChangeProposalWrapper Init
Metadata
| Field | Value |
|---|---|
| implementation_name | MetadataChangeProposalWrapper Init |
| description | Constructing and validating Metadata Change Proposal wrappers for packaging metadata changes for emission. |
| type | implementation |
| category | API Doc |
| status | active |
| last_updated | 2026-02-10 |
| version | 1.0 |
Overview
MetadataChangeProposalWrapper is a Python dataclass that provides a type-safe, high-level interface for constructing Metadata Change Proposals (MCPs). It wraps an entity URN and an aspect instance into a validated proposal that can be serialized and emitted to DataHub via REST or Kafka.
Source Reference
| Field | Value |
|---|---|
| File | metadata-ingestion/src/datahub/emitter/mcp.py
|
| Lines | L60-268 |
| Repository | datahub-project/datahub |
Import
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Constructor
@dataclasses.dataclass
class MetadataChangeProposalWrapper:
entityType: str = _ENTITY_TYPE_UNSET
changeType: Union[str, ChangeTypeClass] = ChangeTypeClass.UPSERT
entityUrn: Union[None, str] = None
entityKeyAspect: Union[None, _Aspect] = None
auditHeader: Union[None, KafkaAuditHeaderClass] = None
aspectName: Union[None, str] = None
aspect: Union[None, _Aspect] = None
systemMetadata: Union[None, SystemMetadataClass] = None
headers: Union[None, Dict[str, str]] = None
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
entityType |
str |
"ENTITY_TYPE_UNSET" |
Entity type (e.g., "dataset", "corpuser"). Automatically inferred from entityUrn if not provided.
|
changeType |
Union[str, ChangeTypeClass] |
ChangeTypeClass.UPSERT |
Type of metadata change. UPSERT creates or updates.
|
entityUrn |
Union[None, str] |
None |
URN of the target entity. Required if entityKeyAspect is not set.
|
entityKeyAspect |
Union[None, _Aspect] |
None |
Key aspect for entity identification. Required if entityUrn is not set.
|
auditHeader |
Union[None, KafkaAuditHeaderClass] |
None |
Optional Kafka audit header for traceability. |
aspectName |
Union[None, str] |
None |
Name of the aspect. Automatically derived from aspect.get_aspect_name() if not provided.
|
aspect |
Union[None, _Aspect] |
None |
The typed aspect instance carrying the metadata payload. |
systemMetadata |
Union[None, SystemMetadataClass] |
None |
Optional system metadata (pipeline run IDs, etc.). |
headers |
Union[None, Dict[str, str]] |
None |
Optional custom headers for the proposal. |
Post-Init Validation
The __post_init__ method performs the following validations:
- If
entityUrnis provided butentityTypeis unset, infersentityTypefrom the URN viaguess_entity_type() - If both
entityUrnandentityTypeare provided, validates that they match (case-insensitive) - Raises
ValueErrorifentityTypeis unset andentityUrnis not provided - Raises
ValueErrorif neitherentityUrnnorentityKeyAspectis provided - If
aspectNameis not set butaspectis provided, derivesaspectNamefrom the aspect class - Raises
ValueErrorifaspectNamedoes not match the aspect's declared name
Key Methods
make_mcp
def make_mcp(self) -> MetadataChangeProposalClass:
Serializes the wrapper into a MetadataChangeProposalClass instance. Converts the typed aspect and entity key aspect into GenericAspectClass instances with JSON-serialized content.
validate
def validate(self) -> bool:
Validates the MCP wrapper. Checks that:
- Either
entityUrnorentityKeyAspectis set (but not both) - The
entityKeyAspectvalidates if present - The
aspectvalidates if present - The underlying MCP structure is valid
Returns True if valid, False otherwise.
as_workunit
def as_workunit(
self, *, treat_errors_as_warnings: bool = False, is_primary_source: bool = True
) -> "MetadataWorkUnit":
Wraps the MCP into a MetadataWorkUnit for use in ingestion pipelines. Generates a unique workunit ID.
construct_many
@classmethod
def construct_many(
cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]]
) -> List["MetadataChangeProposalWrapper"]:
Class method that creates multiple MCPs for the same entity. Filters out None aspects.
from_obj
@classmethod
def from_obj(
cls, obj: dict, tuples: bool = False
) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]:
Deserializes a dictionary into an MCPW. Falls back to a standard MetadataChangeProposalClass if the aspect type is unknown.
I/O Contract
| Field | Value |
|---|---|
| Input | Entity URN (string), aspect instance (typed _Aspect subclass), optional change type and metadata
|
| Output | Validated MetadataChangeProposalWrapper instance ready for emission
|
| Validation | Automatic entityType inference and cross-validation; aspectName derivation and consistency checking |
| Exceptions | ValueError if entityType does not match URN, if neither entityUrn nor entityKeyAspect is set, or if aspectName does not match the aspect class
|
Usage Examples
Basic MCP Construction
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mce_builder import make_dataset_urn, make_ownership_aspect_from_urn_list, make_user_urn
from datahub.metadata.schema_classes import ChangeTypeClass
dataset_urn = make_dataset_urn(platform="mysql", name="prod_db.users", env="PROD")
ownership = make_ownership_aspect_from_urn_list(
owner_urns=[make_user_urn("jdoe")],
source_type=None,
)
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=ownership,
)
# entityType is automatically inferred as "dataset"
# aspectName is automatically set to "ownership"
# changeType defaults to UPSERT
Batch Construction
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
GlobalTagsClass,
TagAssociationClass,
)
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,prod_db.users,PROD)"
properties = DatasetPropertiesClass(
name="users",
description="User account table",
)
tags = GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:pii")])
mcps = MetadataChangeProposalWrapper.construct_many(
entityUrn=dataset_urn,
aspects=[properties, tags],
)
# Returns a list of 2 MCPs, one for each aspect
Validation
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=ownership,
)
if mcp.validate():
print("MCP is valid and ready for emission")
Related
- Implements: Datahub_project_Datahub_Metadata_Change_Proposal
- Depends on: Datahub_project_Datahub_Mce_Builder_URN_Helpers
- Used by: Datahub_project_Datahub_Emitter_Emit
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment