Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub MetadataChangeProposalWrapper Init

From Leeroopedia


Metadata

Field Value
implementation_name MetadataChangeProposalWrapper Init
description Constructing and validating Metadata Change Proposal wrappers for packaging metadata changes for emission.
type implementation
category API Doc
status active
last_updated 2026-02-10
version 1.0

Overview

MetadataChangeProposalWrapper is a Python dataclass that provides a type-safe, high-level interface for constructing Metadata Change Proposals (MCPs). It wraps an entity URN and an aspect instance into a validated proposal that can be serialized and emitted to DataHub via REST or Kafka.

Source Reference

Field Value
File metadata-ingestion/src/datahub/emitter/mcp.py
Lines L60-268
Repository datahub-project/datahub

Import

from datahub.emitter.mcp import MetadataChangeProposalWrapper

Constructor

@dataclasses.dataclass
class MetadataChangeProposalWrapper:
    entityType: str = _ENTITY_TYPE_UNSET
    changeType: Union[str, ChangeTypeClass] = ChangeTypeClass.UPSERT
    entityUrn: Union[None, str] = None
    entityKeyAspect: Union[None, _Aspect] = None
    auditHeader: Union[None, KafkaAuditHeaderClass] = None
    aspectName: Union[None, str] = None
    aspect: Union[None, _Aspect] = None
    systemMetadata: Union[None, SystemMetadataClass] = None
    headers: Union[None, Dict[str, str]] = None

Parameters

Parameter Type Default Description
entityType str "ENTITY_TYPE_UNSET" Entity type (e.g., "dataset", "corpuser"). Automatically inferred from entityUrn if not provided.
changeType Union[str, ChangeTypeClass] ChangeTypeClass.UPSERT Type of metadata change. UPSERT creates or updates.
entityUrn Union[None, str] None URN of the target entity. Required if entityKeyAspect is not set.
entityKeyAspect Union[None, _Aspect] None Key aspect for entity identification. Required if entityUrn is not set.
auditHeader Union[None, KafkaAuditHeaderClass] None Optional Kafka audit header for traceability.
aspectName Union[None, str] None Name of the aspect. Automatically derived from aspect.get_aspect_name() if not provided.
aspect Union[None, _Aspect] None The typed aspect instance carrying the metadata payload.
systemMetadata Union[None, SystemMetadataClass] None Optional system metadata (pipeline run IDs, etc.).
headers Union[None, Dict[str, str]] None Optional custom headers for the proposal.

Post-Init Validation

The __post_init__ method performs the following validations:

  1. If entityUrn is provided but entityType is unset, infers entityType from the URN via guess_entity_type()
  2. If both entityUrn and entityType are provided, validates that they match (case-insensitive)
  3. Raises ValueError if entityType is unset and entityUrn is not provided
  4. Raises ValueError if neither entityUrn nor entityKeyAspect is provided
  5. If aspectName is not set but aspect is provided, derives aspectName from the aspect class
  6. Raises ValueError if aspectName does not match the aspect's declared name

Key Methods

make_mcp

def make_mcp(self) -> MetadataChangeProposalClass:

Serializes the wrapper into a MetadataChangeProposalClass instance. Converts the typed aspect and entity key aspect into GenericAspectClass instances with JSON-serialized content.

validate

def validate(self) -> bool:

Validates the MCP wrapper. Checks that:

  • Either entityUrn or entityKeyAspect is set (but not both)
  • The entityKeyAspect validates if present
  • The aspect validates if present
  • The underlying MCP structure is valid

Returns True if valid, False otherwise.

as_workunit

def as_workunit(
    self, *, treat_errors_as_warnings: bool = False, is_primary_source: bool = True
) -> "MetadataWorkUnit":

Wraps the MCP into a MetadataWorkUnit for use in ingestion pipelines. Generates a unique workunit ID.

construct_many

@classmethod
def construct_many(
    cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]]
) -> List["MetadataChangeProposalWrapper"]:

Class method that creates multiple MCPs for the same entity. Filters out None aspects.

from_obj

@classmethod
def from_obj(
    cls, obj: dict, tuples: bool = False
) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]:

Deserializes a dictionary into an MCPW. Falls back to a standard MetadataChangeProposalClass if the aspect type is unknown.

I/O Contract

Field Value
Input Entity URN (string), aspect instance (typed _Aspect subclass), optional change type and metadata
Output Validated MetadataChangeProposalWrapper instance ready for emission
Validation Automatic entityType inference and cross-validation; aspectName derivation and consistency checking
Exceptions ValueError if entityType does not match URN, if neither entityUrn nor entityKeyAspect is set, or if aspectName does not match the aspect class

Usage Examples

Basic MCP Construction

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mce_builder import make_dataset_urn, make_ownership_aspect_from_urn_list, make_user_urn
from datahub.metadata.schema_classes import ChangeTypeClass

dataset_urn = make_dataset_urn(platform="mysql", name="prod_db.users", env="PROD")

ownership = make_ownership_aspect_from_urn_list(
    owner_urns=[make_user_urn("jdoe")],
    source_type=None,
)

mcp = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn,
    aspect=ownership,
)

# entityType is automatically inferred as "dataset"
# aspectName is automatically set to "ownership"
# changeType defaults to UPSERT

Batch Construction

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
    DatasetPropertiesClass,
    GlobalTagsClass,
    TagAssociationClass,
)

dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,prod_db.users,PROD)"

properties = DatasetPropertiesClass(
    name="users",
    description="User account table",
)
tags = GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:pii")])

mcps = MetadataChangeProposalWrapper.construct_many(
    entityUrn=dataset_urn,
    aspects=[properties, tags],
)
# Returns a list of 2 MCPs, one for each aspect

Validation

mcp = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn,
    aspect=ownership,
)

if mcp.validate():
    print("MCP is valid and ready for emission")

Related

Knowledge Sources

Domains

Data_Integration, Metadata_Management

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment