Metadata
| Field |
Value
|
| implementation_name |
DataHubRestEmitter Init
|
| description |
Initializing the REST and Kafka emitter clients for authenticated metadata emission to DataHub.
|
| type |
implementation
|
| category |
API Doc
|
| status |
active
|
| last_updated |
2026-02-10
|
| version |
1.0
|
Overview
This implementation covers the initialization of DataHubRestEmitter and DatahubKafkaEmitter, the two concrete emitter classes provided by the DataHub Python SDK. Each class establishes an authenticated connection to the DataHub backend and configures transport-specific behavior such as retries, timeouts, serialization, and topic routing.
Source Reference
| Field |
Value
|
| REST Emitter File |
metadata-ingestion/src/datahub/emitter/rest_emitter.py
|
| REST Emitter Lines |
L366-460 (class definition and __init__)
|
| Kafka Emitter File |
metadata-ingestion/src/datahub/emitter/kafka_emitter.py
|
| Kafka Emitter Lines |
L66-132 (class definition and __init__)
|
| Emitter Protocol File |
metadata-ingestion/src/datahub/emitter/generic_emitter.py
|
| Repository |
datahub-project/datahub
|
DataHubRestEmitter
Constructor Signature
class DataHubRestEmitter(Closeable, Emitter):
def __init__(
self,
gms_server: str,
token: Optional[str] = None,
timeout_sec: Optional[float] = None,
connect_timeout_sec: Optional[float] = None,
read_timeout_sec: Optional[float] = None,
retry_status_codes: Optional[List[int]] = None,
retry_methods: Optional[List[str]] = None,
retry_max_times: Optional[int] = None,
extra_headers: Optional[Dict[str, str]] = None,
ca_certificate_path: Optional[str] = None,
client_certificate_path: Optional[str] = None,
disable_ssl_verification: bool = False,
openapi_ingestion: Optional[bool] = None,
client_mode: Optional[ClientMode] = None,
datahub_component: Optional[str] = None,
server_config_refresh_interval: Optional[int] = None,
):
Import
from datahub.emitter.rest_emitter import DataHubRestEmitter
Parameters
| Parameter |
Type |
Default |
Description
|
gms_server |
str |
(required) |
URL of the DataHub GMS endpoint (e.g., http://localhost:8080). Accepts "__from_env__" to load from environment.
|
token |
Optional[str] |
None |
Bearer token for authentication. If None, attempts to load system auth from environment variables.
|
timeout_sec |
Optional[float] |
30.0 |
Default timeout for both connect and read operations.
|
connect_timeout_sec |
Optional[float] |
None |
Override timeout for connection establishment. Falls back to timeout_sec.
|
read_timeout_sec |
Optional[float] |
None |
Override timeout for reading responses. Falls back to timeout_sec.
|
retry_status_codes |
Optional[List[int]] |
[429, 500, 502, 503, 504] |
HTTP status codes that trigger automatic retry.
|
retry_methods |
Optional[List[str]] |
["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS", "TRACE"] |
HTTP methods eligible for retry.
|
retry_max_times |
Optional[int] |
Configurable via env |
Maximum number of retry attempts.
|
extra_headers |
Optional[Dict[str, str]] |
None |
Additional HTTP headers merged into every request.
|
ca_certificate_path |
Optional[str] |
None |
Path to CA certificate bundle for SSL verification.
|
client_certificate_path |
Optional[str] |
None |
Path to client certificate for mutual TLS.
|
disable_ssl_verification |
bool |
False |
Disable SSL certificate verification entirely.
|
openapi_ingestion |
Optional[bool] |
None |
Force OpenAPI endpoint usage. Auto-detected from server config if None.
|
client_mode |
Optional[ClientMode] |
ClientMode.SDK |
Client mode sent as a header for server-side behavior differentiation.
|
datahub_component |
Optional[str] |
None |
Identifier for the calling component, included in the User-Agent header.
|
server_config_refresh_interval |
Optional[int] |
None |
Interval in seconds between server configuration refreshes.
|
DatahubKafkaEmitter
Constructor Signature
class DatahubKafkaEmitter(Closeable, Emitter):
def __init__(self, config: KafkaEmitterConfig):
Import
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
KafkaEmitterConfig
| Field |
Type |
Default |
Description
|
connection |
KafkaProducerConnectionConfig |
(default factory) |
Kafka connection settings including bootstrap servers, schema_registry_url, and producer_config.
|
topic_routes |
Dict[str, str] |
{"MetadataChangeEvent": "MetadataChangeEvent_v4", "MetadataChangeProposal": "MetadataChangeProposal_v1"} |
Mapping of message types to Kafka topic names.
|
The Kafka emitter creates separate SerializingProducer instances for MCE and MCP message types, each using Avro serialization via the schema registry. If OAuth callback is configured (e.g., for AWS MSK IAM), it triggers the callback during initialization.
I/O Contract
| Field |
Value
|
| Input (REST) |
GMS server URL, optional authentication token, optional transport configuration
|
| Input (Kafka) |
KafkaEmitterConfig with Kafka bootstrap servers, schema registry URL, and topic routes
|
| Output |
Initialized emitter instance ready for emit() calls
|
| Side Effects (REST) |
Creates an HTTP session with retry adapters, sets authentication headers, configures SSL
|
| Side Effects (Kafka) |
Creates SerializingProducer instances, connects to schema registry, triggers OAuth if configured
|
| Exceptions |
ConfigurationError if gms_server is empty or missing
|
Usage Examples
REST Emitter
from datahub.emitter.rest_emitter import DataHubRestEmitter
# Basic initialization
emitter = DataHubRestEmitter(
gms_server="http://localhost:8080",
token="your-access-token",
)
# With custom retry and timeout configuration
emitter = DataHubRestEmitter(
gms_server="http://localhost:8080",
token="your-access-token",
connect_timeout_sec=10.0,
read_timeout_sec=60.0,
retry_max_times=5,
retry_status_codes=[429, 500, 502, 503, 504],
)
# Test connection and verify server config
emitter.test_connection()
Kafka Emitter
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.configuration.kafka import KafkaProducerConnectionConfig
config = KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap="localhost:9092",
schema_registry_url="http://localhost:8081",
),
)
emitter = DatahubKafkaEmitter(config)
Loading from Environment
from datahub.emitter.rest_emitter import DataHubRestEmitter
# Load GMS URL and token from environment variables
emitter = DataHubRestEmitter(gms_server="__from_env__")
Related
Knowledge Sources
Domains
Data_Integration, Metadata_Management