Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Emitter Emit

From Leeroopedia


Metadata

Field Value
implementation_name Emitter Emit
description Emitting metadata change proposals to DataHub via REST HTTP or Kafka transport, including flush and delivery confirmation.
type implementation
category API Doc
status active
last_updated 2026-02-10
version 1.0

Overview

This implementation covers the emit() and flush() methods of both DataHubRestEmitter and DatahubKafkaEmitter. These methods are the final step in the metadata emission pipeline, handling serialization, transport, retry logic, and delivery confirmation.

Source Reference

Field Value
REST emit metadata-ingestion/src/datahub/emitter/rest_emitter.py, L594-620
REST emit_mcp metadata-ingestion/src/datahub/emitter/rest_emitter.py, L665-725
REST flush metadata-ingestion/src/datahub/emitter/rest_emitter.py, L1048-1050
Kafka emit metadata-ingestion/src/datahub/emitter/kafka_emitter.py, L134-146
Kafka flush metadata-ingestion/src/datahub/emitter/kafka_emitter.py, L185-187
Repository datahub-project/datahub

REST Emitter: emit()

Signature

def emit(
    self,
    item: Union[
        MetadataChangeEvent,
        MetadataChangeProposal,
        MetadataChangeProposalWrapper,
        UsageAggregation,
    ],
    callback: Optional[Callable[[Exception, str], None]] = None,
    emit_mode: EmitMode = _DEFAULT_EMIT_MODE,
) -> None:

Import

from datahub.emitter.rest_emitter import DataHubRestEmitter, EmitMode

Parameters

Parameter Type Default Description
item Union[MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper, UsageAggregation] (required) The metadata item to emit. MCPs and MCPWs are the primary types.
callback Optional[Callable[[Exception, str], None]] None Optional callback invoked on success (None, "success") or failure (exception, error_message).
emit_mode EmitMode EmitMode.SYNC_PRIMARY Controls consistency vs. performance tradeoff.

Behavior

The emit() method dispatches based on item type:

  • MetadataChangeProposal / MetadataChangeProposalWrapper -- Delegates to emit_mcp()
  • MetadataChangeEvent -- Delegates to emit_mce()
  • UsageAggregation -- Delegates to emit_usage() (deprecated)

On success, invokes the callback with (None, "success"). On failure, invokes the callback with the exception, then re-raises.

REST Emitter: emit_mcp()

Signature

def emit_mcp(
    self,
    mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
    async_flag: Optional[bool] = None,
    emit_mode: EmitMode = _DEFAULT_EMIT_MODE,
    wait_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> None:

Behavior

Depending on the configured endpoint:

OpenAPI endpoint:

  1. Converts the MCP to an OpenApiRequest via _to_openapi_request()
  2. Sends the request to the OpenAPI URL with the appropriate HTTP method
  3. If ASYNC_WAIT mode is enabled, extracts trace data from the response and polls for write completion

RestLI endpoint:

  1. For DELETE change types on key aspects, sends to /entities?action=delete
  2. For other change types, serializes the MCP and sends to /aspects?action=ingestProposal
  3. Sets the async flag in the payload based on the emit mode
  4. Unicode characters are preserved in the serialized payload

REST Emitter: emit_mcps() (Batch)

Signature

def emit_mcps(
    self,
    mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
    emit_mode: EmitMode = _DEFAULT_EMIT_MODE,
    wait_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> int:

Emits multiple MCPs in batches. Automatically chunks payloads to stay within size limits (~15 MB max per request, configurable batch length). Returns the number of HTTP requests sent.

EmitMode Enum

Mode Description
SYNC_WAIT Synchronously updates both primary (SQL) and search (Elasticsearch) storage before returning.
SYNC_PRIMARY Synchronously updates primary storage; asynchronously updates search storage. (Default)
ASYNC Queues the change for asynchronous processing and returns immediately.
ASYNC_WAIT Queues asynchronously but blocks until persistence confirmation via trace API.

REST Emitter: flush()

def flush(self) -> None:
    # No-op, but present to keep the interface consistent with the Kafka emitter.
    pass

The REST emitter's flush() is a no-op because REST emission is synchronous by nature -- each emit() call waits for the HTTP response before returning.

Kafka Emitter: emit()

Signature

def emit(
    self,
    item: Union[
        MetadataChangeEvent,
        MetadataChangeProposal,
        MetadataChangeProposalWrapper,
    ],
    callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:

Behavior

Dispatches based on item type:

  • MetadataChangeProposal / MetadataChangeProposalWrapper -- Delegates to emit_mcp_async()
  • MetadataChangeEvent -- Delegates to emit_mce_async()

If no callback is provided, uses a default error-reporting callback that logs failures.

emit_mcp_async

def emit_mcp_async(
    self,
    mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
    callback: Callable[[Exception, str], None],
) -> None:
  1. Calls producer.poll(0) to trigger callbacks from previous writes
  2. Produces the MCP to the configured MCP topic (MetadataChangeProposal_v1 by default)
  3. Uses the entity URN as the Kafka message key
  4. The Avro serializer handles serialization via the Schema Registry

Kafka Emitter: flush()

def flush(self) -> None:
    for producer in self.producers.values():
        producer.flush()

Blocks until all pending Kafka messages have been delivered. This is critical to call before application shutdown to avoid losing buffered metadata events.

I/O Contract

Field Value
Input MetadataChangeProposalWrapper, MetadataChangeProposal, or MetadataChangeEvent instance
Output (REST) None (raises OperationalError on failure)
Output (Kafka) None (delivery status reported via callback)
Side Effects (REST) HTTP POST/PATCH to GMS endpoint; metadata persisted in primary and search storage
Side Effects (Kafka) Message published to Kafka topic; consumed asynchronously by backend
Exceptions (REST) OperationalError for HTTP errors, JSON parse failures, or network issues; TraceTimeoutError for async wait timeout; TraceValidationError for failed async writes

Usage Examples

Complete REST Emission Pipeline

from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.metadata.schema_classes import DatasetPropertiesClass

# Initialize emitter
emitter = DataHubRestEmitter(gms_server="http://localhost:8080", token="your-token")
emitter.test_connection()

# Construct metadata
dataset_urn = make_dataset_urn(platform="mysql", name="prod_db.users", env="PROD")
properties = DatasetPropertiesClass(name="users", description="User account table")

# Wrap in MCP
mcp = MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=properties)

# Emit
emitter.emit(mcp)

# Flush (no-op for REST, but good practice for transport-agnostic code)
emitter.flush()

Complete Kafka Emission Pipeline

from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.metadata.schema_classes import DatasetPropertiesClass

# Initialize emitter
config = KafkaEmitterConfig(
    connection=KafkaProducerConnectionConfig(
        bootstrap="localhost:9092",
        schema_registry_url="http://localhost:8081",
    ),
)
emitter = DatahubKafkaEmitter(config)

# Construct metadata
dataset_urn = make_dataset_urn(platform="mysql", name="prod_db.users", env="PROD")
properties = DatasetPropertiesClass(name="users", description="User account table")

# Wrap in MCP
mcp = MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=properties)

# Emit with callback
def on_delivery(err, msg):
    if err:
        print(f"Failed: {err}")
    else:
        print(f"Delivered: {msg}")

emitter.emit(mcp, callback=on_delivery)

# IMPORTANT: flush to ensure delivery before shutdown
emitter.flush()

Using Emit Modes (REST)

from datahub.emitter.rest_emitter import DataHubRestEmitter, EmitMode

emitter = DataHubRestEmitter(gms_server="http://localhost:8080")

# Default: synchronous primary storage, async search
emitter.emit(mcp)

# Fully synchronous: blocks until both storage and search are updated
emitter.emit(mcp, emit_mode=EmitMode.SYNC_WAIT)

# Fully asynchronous: returns immediately
emitter.emit(mcp, emit_mode=EmitMode.ASYNC)

# Async with persistence confirmation
emitter.emit(mcp, emit_mode=EmitMode.ASYNC_WAIT)

Related

Knowledge Sources

Domains

Data_Integration, Metadata_Management

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment