Implementation:Heibaiying BigData Notes KafkaProducer Send
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Distributed_Systems |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for sending messages to Kafka topics using synchronous and asynchronous patterns provided by the org.apache.kafka.clients.producer library.
Description
The KafkaProducer.send() method accepts a ProducerRecord and optionally a Callback. It returns a Future<RecordMetadata> that resolves when the broker acknowledges the record. For synchronous sending, the caller invokes .get() on the future to block until completion. For asynchronous sending, a Callback is provided whose onCompletion method is invoked with either the RecordMetadata (on success) or an Exception (on failure).
A ProducerRecord specifies the target topic, an optional key (used for partitioning), and the message value. If a key is provided, records with the same key are routed to the same partition, preserving ordering for that key.
Usage
Use the synchronous pattern (.get()) when you need confirmation that each individual record has been persisted before sending the next. Use the asynchronous pattern (Callback) for high-throughput scenarios where you still need delivery confirmation but cannot afford to block the sending thread on each record.
Code Reference
Source Location
code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java:L15-42
code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java:L15-42
Signature
// Synchronous send (block on .get())
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
// Asynchronous send (with callback)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
Import
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import java.util.concurrent.Future;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | ProducerRecord<String, String> | A record containing the target topic name, an optional key, and the message value. |
| Input (async) | Callback | An optional callback whose onCompletion(RecordMetadata metadata, Exception exception) method is invoked when the send completes. |
| Output | Future<RecordMetadata> | A future that resolves to RecordMetadata containing the topic, partition, offset, and timestamp of the written record. |
| Throws | ExecutionException | Wraps broker-side errors (e.g., NotLeaderForPartitionException) when calling Future.get(). |
| Throws | InterruptedException | Thrown if the calling thread is interrupted while blocking on Future.get(). |
| Throws | SerializationException | Thrown if the key or value cannot be serialized. |
Usage Examples
Synchronous Send
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
// Send a record and block until the broker acknowledges
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("hello", "key-" + i, "value-" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
Asynchronous Send
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
// Send a record with a callback for non-blocking acknowledgment
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("hello", "key-" + i, "value-" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}