Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub DataHubRestEmitter Init

From Leeroopedia


Metadata

Field Value
implementation_name DataHubRestEmitter Init
description Initializing the REST and Kafka emitter clients for authenticated metadata emission to DataHub.
type implementation
category API Doc
status active
last_updated 2026-02-10
version 1.0

Overview

This implementation covers the initialization of DataHubRestEmitter and DatahubKafkaEmitter, the two concrete emitter classes provided by the DataHub Python SDK. Each class establishes an authenticated connection to the DataHub backend and configures transport-specific behavior such as retries, timeouts, serialization, and topic routing.

Source Reference

Field Value
REST Emitter File metadata-ingestion/src/datahub/emitter/rest_emitter.py
REST Emitter Lines L366-460 (class definition and __init__)
Kafka Emitter File metadata-ingestion/src/datahub/emitter/kafka_emitter.py
Kafka Emitter Lines L66-132 (class definition and __init__)
Emitter Protocol File metadata-ingestion/src/datahub/emitter/generic_emitter.py
Repository datahub-project/datahub

DataHubRestEmitter

Constructor Signature

class DataHubRestEmitter(Closeable, Emitter):
    def __init__(
        self,
        gms_server: str,
        token: Optional[str] = None,
        timeout_sec: Optional[float] = None,
        connect_timeout_sec: Optional[float] = None,
        read_timeout_sec: Optional[float] = None,
        retry_status_codes: Optional[List[int]] = None,
        retry_methods: Optional[List[str]] = None,
        retry_max_times: Optional[int] = None,
        extra_headers: Optional[Dict[str, str]] = None,
        ca_certificate_path: Optional[str] = None,
        client_certificate_path: Optional[str] = None,
        disable_ssl_verification: bool = False,
        openapi_ingestion: Optional[bool] = None,
        client_mode: Optional[ClientMode] = None,
        datahub_component: Optional[str] = None,
        server_config_refresh_interval: Optional[int] = None,
    ):

Import

from datahub.emitter.rest_emitter import DataHubRestEmitter

Parameters

Parameter Type Default Description
gms_server str (required) URL of the DataHub GMS endpoint (e.g., http://localhost:8080). Accepts "__from_env__" to load from environment.
token Optional[str] None Bearer token for authentication. If None, attempts to load system auth from environment variables.
timeout_sec Optional[float] 30.0 Default timeout for both connect and read operations.
connect_timeout_sec Optional[float] None Override timeout for connection establishment. Falls back to timeout_sec.
read_timeout_sec Optional[float] None Override timeout for reading responses. Falls back to timeout_sec.
retry_status_codes Optional[List[int]] [429, 500, 502, 503, 504] HTTP status codes that trigger automatic retry.
retry_methods Optional[List[str]] ["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS", "TRACE"] HTTP methods eligible for retry.
retry_max_times Optional[int] Configurable via env Maximum number of retry attempts.
extra_headers Optional[Dict[str, str]] None Additional HTTP headers merged into every request.
ca_certificate_path Optional[str] None Path to CA certificate bundle for SSL verification.
client_certificate_path Optional[str] None Path to client certificate for mutual TLS.
disable_ssl_verification bool False Disable SSL certificate verification entirely.
openapi_ingestion Optional[bool] None Force OpenAPI endpoint usage. Auto-detected from server config if None.
client_mode Optional[ClientMode] ClientMode.SDK Client mode sent as a header for server-side behavior differentiation.
datahub_component Optional[str] None Identifier for the calling component, included in the User-Agent header.
server_config_refresh_interval Optional[int] None Interval in seconds between server configuration refreshes.

DatahubKafkaEmitter

Constructor Signature

class DatahubKafkaEmitter(Closeable, Emitter):
    def __init__(self, config: KafkaEmitterConfig):

Import

from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig

KafkaEmitterConfig

Field Type Default Description
connection KafkaProducerConnectionConfig (default factory) Kafka connection settings including bootstrap servers, schema_registry_url, and producer_config.
topic_routes Dict[str, str] {"MetadataChangeEvent": "MetadataChangeEvent_v4", "MetadataChangeProposal": "MetadataChangeProposal_v1"} Mapping of message types to Kafka topic names.

The Kafka emitter creates separate SerializingProducer instances for MCE and MCP message types, each using Avro serialization via the schema registry. If OAuth callback is configured (e.g., for AWS MSK IAM), it triggers the callback during initialization.

I/O Contract

Field Value
Input (REST) GMS server URL, optional authentication token, optional transport configuration
Input (Kafka) KafkaEmitterConfig with Kafka bootstrap servers, schema registry URL, and topic routes
Output Initialized emitter instance ready for emit() calls
Side Effects (REST) Creates an HTTP session with retry adapters, sets authentication headers, configures SSL
Side Effects (Kafka) Creates SerializingProducer instances, connects to schema registry, triggers OAuth if configured
Exceptions ConfigurationError if gms_server is empty or missing

Usage Examples

REST Emitter

from datahub.emitter.rest_emitter import DataHubRestEmitter

# Basic initialization
emitter = DataHubRestEmitter(
    gms_server="http://localhost:8080",
    token="your-access-token",
)

# With custom retry and timeout configuration
emitter = DataHubRestEmitter(
    gms_server="http://localhost:8080",
    token="your-access-token",
    connect_timeout_sec=10.0,
    read_timeout_sec=60.0,
    retry_max_times=5,
    retry_status_codes=[429, 500, 502, 503, 504],
)

# Test connection and verify server config
emitter.test_connection()

Kafka Emitter

from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.configuration.kafka import KafkaProducerConnectionConfig

config = KafkaEmitterConfig(
    connection=KafkaProducerConnectionConfig(
        bootstrap="localhost:9092",
        schema_registry_url="http://localhost:8081",
    ),
)
emitter = DatahubKafkaEmitter(config)

Loading from Environment

from datahub.emitter.rest_emitter import DataHubRestEmitter

# Load GMS URL and token from environment variables
emitter = DataHubRestEmitter(gms_server="__from_env__")

Related

Knowledge Sources

Domains

Data_Integration, Metadata_Management

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment