Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub KafkaEmitterConfig

From Leeroopedia


Knowledge Sources
Domains Java_SDK, Metadata_Emission, Kafka
Last Updated 2026-02-10 00:00 GMT

Overview

An immutable Lombok-based configuration class that holds connection settings for the KafkaEmitter, including Kafka bootstrap servers, Schema Registry URL, and custom producer properties.

Description

KafkaEmitterConfig is a Lombok @Value/@Builder class that encapsulates all settings required to construct a KafkaEmitter. It provides sensible defaults for local development (bootstrap at localhost:9092, Schema Registry at http://localhost:8081) while allowing full customization of Kafka producer and Schema Registry configurations through map-based property overrides.

The builder includes:

  • bootstrap -- Kafka bootstrap servers (default: localhost:9092).
  • schemaRegistryUrl -- Confluent Schema Registry URL (default: http://localhost:8081).
  • schemaRegistryConfig -- Additional Schema Registry properties (default: empty map).
  • producerConfig -- Additional Kafka producer properties (default: empty map).
  • eventFormatter -- The EventFormatter used to convert wrapper MCPs to raw MCPs (default: Pegasus JSON format).

The builder also reads the client version from a client.properties resource on the classpath, falling back to "unknown" if the file cannot be loaded. A with() method on the builder allows functional-style chaining of configuration.

Usage

Use this configuration class to set up connection parameters before constructing a KafkaEmitter. Override defaults when connecting to remote Kafka clusters or when custom producer settings (e.g., authentication, compression) are needed.

Code Reference

Source Location

metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitterConfig.java

Signature

@Value
@Builder
public class KafkaEmitterConfig {
    public static final String CLIENT_VERSION_PROPERTY = "clientVersion";

    @Builder.Default String bootstrap = "localhost:9092";
    @Builder.Default String schemaRegistryUrl = "http://localhost:8081";
    @Builder.Default Map<String, String> schemaRegistryConfig = Collections.emptyMap();
    @Builder.Default Map<String, String> producerConfig = Collections.emptyMap();
    @Builder.Default EventFormatter eventFormatter =
        new EventFormatter(EventFormatter.Format.PEGASUS_JSON);

    public static class KafkaEmitterConfigBuilder {
        public KafkaEmitterConfigBuilder with(
            Consumer<KafkaEmitterConfigBuilder> builderFunction);
    }
}

Import

import datahub.client.kafka.KafkaEmitterConfig;

I/O Contract

Inputs

Field Type Default Description
bootstrap String "localhost:9092" Kafka bootstrap server addresses
schemaRegistryUrl String "http://localhost:8081" Confluent Schema Registry URL
schemaRegistryConfig Map<String, String> empty map Additional Schema Registry configuration properties
producerConfig Map<String, String> empty map Additional Kafka producer configuration properties
eventFormatter EventFormatter Pegasus JSON Formatter for converting MCP wrappers to raw MCPs

Outputs

An immutable KafkaEmitterConfig instance used to construct KafkaEmitter.

Usage Examples

// Minimal configuration with defaults
KafkaEmitterConfig config = KafkaEmitterConfig.builder().build();

// Full configuration with custom settings
KafkaEmitterConfig config = KafkaEmitterConfig.builder()
    .bootstrap("kafka-broker-1:9092,kafka-broker-2:9092")
    .schemaRegistryUrl("http://schema-registry:8081")
    .schemaRegistryConfig(Map.of(
        "basic.auth.credentials.source", "USER_INFO",
        "basic.auth.user.info", "user:password"
    ))
    .producerConfig(Map.of(
        "compression.type", "snappy",
        "acks", "all"
    ))
    .build();

// Functional-style builder chaining
KafkaEmitterConfig config = KafkaEmitterConfig.builder()
    .with(b -> b.bootstrap("remote-kafka:9092"))
    .build();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment