Implementation:Datahub project Datahub Emitter Emit
Metadata
| Field | Value |
|---|---|
| implementation_name | Emitter Emit |
| description | Emitting metadata change proposals to DataHub via REST HTTP or Kafka transport, including flush and delivery confirmation. |
| type | implementation |
| category | API Doc |
| status | active |
| last_updated | 2026-02-10 |
| version | 1.0 |
Overview
This implementation covers the emit() and flush() methods of both DataHubRestEmitter and DatahubKafkaEmitter. These methods are the final step in the metadata emission pipeline, handling serialization, transport, retry logic, and delivery confirmation.
Source Reference
| Field | Value |
|---|---|
| REST emit | metadata-ingestion/src/datahub/emitter/rest_emitter.py, L594-620
|
| REST emit_mcp | metadata-ingestion/src/datahub/emitter/rest_emitter.py, L665-725
|
| REST flush | metadata-ingestion/src/datahub/emitter/rest_emitter.py, L1048-1050
|
| Kafka emit | metadata-ingestion/src/datahub/emitter/kafka_emitter.py, L134-146
|
| Kafka flush | metadata-ingestion/src/datahub/emitter/kafka_emitter.py, L185-187
|
| Repository | datahub-project/datahub |
REST Emitter: emit()
Signature
def emit(
self,
item: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
UsageAggregation,
],
callback: Optional[Callable[[Exception, str], None]] = None,
emit_mode: EmitMode = _DEFAULT_EMIT_MODE,
) -> None:
Import
from datahub.emitter.rest_emitter import DataHubRestEmitter, EmitMode
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
item |
Union[MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper, UsageAggregation] |
(required) | The metadata item to emit. MCPs and MCPWs are the primary types. |
callback |
Optional[Callable[[Exception, str], None]] |
None |
Optional callback invoked on success (None, "success") or failure (exception, error_message).
|
emit_mode |
EmitMode |
EmitMode.SYNC_PRIMARY |
Controls consistency vs. performance tradeoff. |
Behavior
The emit() method dispatches based on item type:
- MetadataChangeProposal / MetadataChangeProposalWrapper -- Delegates to
emit_mcp() - MetadataChangeEvent -- Delegates to
emit_mce() - UsageAggregation -- Delegates to
emit_usage()(deprecated)
On success, invokes the callback with (None, "success"). On failure, invokes the callback with the exception, then re-raises.
REST Emitter: emit_mcp()
Signature
def emit_mcp(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
emit_mode: EmitMode = _DEFAULT_EMIT_MODE,
wait_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> None:
Behavior
Depending on the configured endpoint:
OpenAPI endpoint:
- Converts the MCP to an
OpenApiRequestvia_to_openapi_request() - Sends the request to the OpenAPI URL with the appropriate HTTP method
- If
ASYNC_WAITmode is enabled, extracts trace data from the response and polls for write completion
RestLI endpoint:
- For
DELETEchange types on key aspects, sends to/entities?action=delete - For other change types, serializes the MCP and sends to
/aspects?action=ingestProposal - Sets the
asyncflag in the payload based on the emit mode - Unicode characters are preserved in the serialized payload
REST Emitter: emit_mcps() (Batch)
Signature
def emit_mcps(
self,
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
emit_mode: EmitMode = _DEFAULT_EMIT_MODE,
wait_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> int:
Emits multiple MCPs in batches. Automatically chunks payloads to stay within size limits (~15 MB max per request, configurable batch length). Returns the number of HTTP requests sent.
EmitMode Enum
| Mode | Description |
|---|---|
| SYNC_WAIT | Synchronously updates both primary (SQL) and search (Elasticsearch) storage before returning. |
| SYNC_PRIMARY | Synchronously updates primary storage; asynchronously updates search storage. (Default) |
| ASYNC | Queues the change for asynchronous processing and returns immediately. |
| ASYNC_WAIT | Queues asynchronously but blocks until persistence confirmation via trace API. |
REST Emitter: flush()
def flush(self) -> None:
# No-op, but present to keep the interface consistent with the Kafka emitter.
pass
The REST emitter's flush() is a no-op because REST emission is synchronous by nature -- each emit() call waits for the HTTP response before returning.
Kafka Emitter: emit()
Signature
def emit(
self,
item: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
Behavior
Dispatches based on item type:
- MetadataChangeProposal / MetadataChangeProposalWrapper -- Delegates to
emit_mcp_async() - MetadataChangeEvent -- Delegates to
emit_mce_async()
If no callback is provided, uses a default error-reporting callback that logs failures.
emit_mcp_async
def emit_mcp_async(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
callback: Callable[[Exception, str], None],
) -> None:
- Calls
producer.poll(0)to trigger callbacks from previous writes - Produces the MCP to the configured MCP topic (
MetadataChangeProposal_v1by default) - Uses the entity URN as the Kafka message key
- The Avro serializer handles serialization via the Schema Registry
Kafka Emitter: flush()
def flush(self) -> None:
for producer in self.producers.values():
producer.flush()
Blocks until all pending Kafka messages have been delivered. This is critical to call before application shutdown to avoid losing buffered metadata events.
I/O Contract
| Field | Value |
|---|---|
| Input | MetadataChangeProposalWrapper, MetadataChangeProposal, or MetadataChangeEvent instance
|
| Output (REST) | None (raises OperationalError on failure)
|
| Output (Kafka) | None (delivery status reported via callback) |
| Side Effects (REST) | HTTP POST/PATCH to GMS endpoint; metadata persisted in primary and search storage |
| Side Effects (Kafka) | Message published to Kafka topic; consumed asynchronously by backend |
| Exceptions (REST) | OperationalError for HTTP errors, JSON parse failures, or network issues; TraceTimeoutError for async wait timeout; TraceValidationError for failed async writes
|
Usage Examples
Complete REST Emission Pipeline
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.metadata.schema_classes import DatasetPropertiesClass
# Initialize emitter
emitter = DataHubRestEmitter(gms_server="http://localhost:8080", token="your-token")
emitter.test_connection()
# Construct metadata
dataset_urn = make_dataset_urn(platform="mysql", name="prod_db.users", env="PROD")
properties = DatasetPropertiesClass(name="users", description="User account table")
# Wrap in MCP
mcp = MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=properties)
# Emit
emitter.emit(mcp)
# Flush (no-op for REST, but good practice for transport-agnostic code)
emitter.flush()
Complete Kafka Emission Pipeline
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.metadata.schema_classes import DatasetPropertiesClass
# Initialize emitter
config = KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap="localhost:9092",
schema_registry_url="http://localhost:8081",
),
)
emitter = DatahubKafkaEmitter(config)
# Construct metadata
dataset_urn = make_dataset_urn(platform="mysql", name="prod_db.users", env="PROD")
properties = DatasetPropertiesClass(name="users", description="User account table")
# Wrap in MCP
mcp = MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=properties)
# Emit with callback
def on_delivery(err, msg):
if err:
print(f"Failed: {err}")
else:
print(f"Delivered: {msg}")
emitter.emit(mcp, callback=on_delivery)
# IMPORTANT: flush to ensure delivery before shutdown
emitter.flush()
Using Emit Modes (REST)
from datahub.emitter.rest_emitter import DataHubRestEmitter, EmitMode
emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
# Default: synchronous primary storage, async search
emitter.emit(mcp)
# Fully synchronous: blocks until both storage and search are updated
emitter.emit(mcp, emit_mode=EmitMode.SYNC_WAIT)
# Fully asynchronous: returns immediately
emitter.emit(mcp, emit_mode=EmitMode.ASYNC)
# Async with persistence confirmation
emitter.emit(mcp, emit_mode=EmitMode.ASYNC_WAIT)
Related
- Implements: Datahub_project_Datahub_Metadata_Emission
- Depends on: Datahub_project_Datahub_DataHubRestEmitter_Init, Datahub_project_Datahub_MetadataChangeProposalWrapper_Init
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Emitter_Selection_Strategy
- Heuristic: Heuristic:Datahub_project_Datahub_Batch_Size_And_Timeout_Tuning