Implementation:Heibaiying BigData Notes KafkaProducer Constructor
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Distributed_Systems |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for configuring and instantiating a Kafka producer provided by the org.apache.kafka.clients.producer library.
Description
The KafkaProducer constructor accepts a Properties object (or Map<String, Object>) containing the producer configuration. The minimum required properties are bootstrap.servers, key.serializer, and value.serializer. Once constructed, the producer establishes connections to the Kafka cluster, fetches metadata about available topics and partitions, and becomes ready to send records.
The constructor performs validation of the provided properties and throws a ConfigException if required properties are missing or invalid. An optional partitioner.class property allows custom partitioning logic by specifying a class that implements the Partitioner interface.
Usage
Use this constructor at application startup to create a single, shared KafkaProducer instance. The producer is thread-safe and should be reused across the application lifecycle. Always call producer.close() during shutdown to flush pending records and release resources.
Code Reference
Source Location
code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java:L14-35
Signature
public KafkaProducer(Properties properties)
Import
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | Properties | A Properties object containing at minimum: bootstrap.servers, key.serializer, value.serializer. Optionally includes partitioner.class, acks, retries, batch.size, linger.ms, and other producer configuration keys. |
| Output | KafkaProducer<String, String> | A fully initialized, thread-safe producer instance connected to the Kafka cluster and ready to send records. |
| Throws | ConfigException | Thrown if required configuration properties are missing or have invalid values. |
Usage Examples
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
// 1. Build the configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2. (Optional) Set a custom partitioner
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
// "com.heibaiying.producers.CustomPartitioner");
// 3. Create the producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// ... use producer.send() to publish records ...
// 4. Close the producer on shutdown
producer.close();