Implementation:Datahub project Datahub KafkaEmitterConfig
| Knowledge Sources | |
|---|---|
| Domains | Java_SDK, Metadata_Emission, Kafka |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
An immutable Lombok-based configuration class that holds connection settings for the KafkaEmitter, including Kafka bootstrap servers, Schema Registry URL, and custom producer properties.
Description
KafkaEmitterConfig is a Lombok @Value/@Builder class that encapsulates all settings required to construct a KafkaEmitter. It provides sensible defaults for local development (bootstrap at localhost:9092, Schema Registry at http://localhost:8081) while allowing full customization of Kafka producer and Schema Registry configurations through map-based property overrides.
The builder includes:
bootstrap-- Kafka bootstrap servers (default:localhost:9092).schemaRegistryUrl-- Confluent Schema Registry URL (default:http://localhost:8081).schemaRegistryConfig-- Additional Schema Registry properties (default: empty map).producerConfig-- Additional Kafka producer properties (default: empty map).eventFormatter-- TheEventFormatterused to convert wrapper MCPs to raw MCPs (default: Pegasus JSON format).
The builder also reads the client version from a client.properties resource on the classpath, falling back to "unknown" if the file cannot be loaded. A with() method on the builder allows functional-style chaining of configuration.
Usage
Use this configuration class to set up connection parameters before constructing a KafkaEmitter. Override defaults when connecting to remote Kafka clusters or when custom producer settings (e.g., authentication, compression) are needed.
Code Reference
Source Location
metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitterConfig.java
Signature
@Value
@Builder
public class KafkaEmitterConfig {
public static final String CLIENT_VERSION_PROPERTY = "clientVersion";
@Builder.Default String bootstrap = "localhost:9092";
@Builder.Default String schemaRegistryUrl = "http://localhost:8081";
@Builder.Default Map<String, String> schemaRegistryConfig = Collections.emptyMap();
@Builder.Default Map<String, String> producerConfig = Collections.emptyMap();
@Builder.Default EventFormatter eventFormatter =
new EventFormatter(EventFormatter.Format.PEGASUS_JSON);
public static class KafkaEmitterConfigBuilder {
public KafkaEmitterConfigBuilder with(
Consumer<KafkaEmitterConfigBuilder> builderFunction);
}
}
Import
import datahub.client.kafka.KafkaEmitterConfig;
I/O Contract
Inputs
| Field | Type | Default | Description |
|---|---|---|---|
bootstrap |
String |
"localhost:9092" |
Kafka bootstrap server addresses |
schemaRegistryUrl |
String |
"http://localhost:8081" |
Confluent Schema Registry URL |
schemaRegistryConfig |
Map<String, String> |
empty map | Additional Schema Registry configuration properties |
producerConfig |
Map<String, String> |
empty map | Additional Kafka producer configuration properties |
eventFormatter |
EventFormatter |
Pegasus JSON | Formatter for converting MCP wrappers to raw MCPs |
Outputs
An immutable KafkaEmitterConfig instance used to construct KafkaEmitter.
Usage Examples
// Minimal configuration with defaults
KafkaEmitterConfig config = KafkaEmitterConfig.builder().build();
// Full configuration with custom settings
KafkaEmitterConfig config = KafkaEmitterConfig.builder()
.bootstrap("kafka-broker-1:9092,kafka-broker-2:9092")
.schemaRegistryUrl("http://schema-registry:8081")
.schemaRegistryConfig(Map.of(
"basic.auth.credentials.source", "USER_INFO",
"basic.auth.user.info", "user:password"
))
.producerConfig(Map.of(
"compression.type", "snappy",
"acks", "all"
))
.build();
// Functional-style builder chaining
KafkaEmitterConfig config = KafkaEmitterConfig.builder()
.with(b -> b.bootstrap("remote-kafka:9092"))
.build();
Related Pages
- Datahub_project_Datahub_KafkaEmitter -- The emitter that consumes this configuration
- Datahub_project_Datahub_RestEmitterConfig -- Analogous configuration for the REST emitter
- Datahub_project_Datahub_S3EmitterConfig -- Analogous configuration for the S3 emitter