Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes KafkaProducer Send

From Leeroopedia


Knowledge Sources
Domains Messaging, Distributed_Systems
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for sending messages to Kafka topics using synchronous and asynchronous patterns provided by the org.apache.kafka.clients.producer library.

Description

The KafkaProducer.send() method accepts a ProducerRecord and optionally a Callback. It returns a Future<RecordMetadata> that resolves when the broker acknowledges the record. For synchronous sending, the caller invokes .get() on the future to block until completion. For asynchronous sending, a Callback is provided whose onCompletion method is invoked with either the RecordMetadata (on success) or an Exception (on failure).

A ProducerRecord specifies the target topic, an optional key (used for partitioning), and the message value. If a key is provided, records with the same key are routed to the same partition, preserving ordering for that key.

Usage

Use the synchronous pattern (.get()) when you need confirmation that each individual record has been persisted before sending the next. Use the asynchronous pattern (Callback) for high-throughput scenarios where you still need delivery confirmation but cannot afford to block the sending thread on each record.

Code Reference

Source Location

code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java:L15-42
code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java:L15-42

Signature

// Synchronous send (block on .get())
public Future<RecordMetadata> send(ProducerRecord<K, V> record)

// Asynchronous send (with callback)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

Import

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import java.util.concurrent.Future;

I/O Contract

Direction Type Description
Input ProducerRecord<String, String> A record containing the target topic name, an optional key, and the message value.
Input (async) Callback An optional callback whose onCompletion(RecordMetadata metadata, Exception exception) method is invoked when the send completes.
Output Future<RecordMetadata> A future that resolves to RecordMetadata containing the topic, partition, offset, and timestamp of the written record.
Throws ExecutionException Wraps broker-side errors (e.g., NotLeaderForPartitionException) when calling Future.get().
Throws InterruptedException Thrown if the calling thread is interrupted while blocking on Future.get().
Throws SerializationException Thrown if the key or value cannot be serialized.

Usage Examples

Synchronous Send

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

// Send a record and block until the broker acknowledges
for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record =
            new ProducerRecord<>("hello", "key-" + i, "value-" + i);
    RecordMetadata metadata = producer.send(record).get();
    System.out.printf("topic=%s, partition=%d, offset=%d%n",
            metadata.topic(), metadata.partition(), metadata.offset());
}

Asynchronous Send

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

// Send a record with a callback for non-blocking acknowledgment
for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record =
            new ProducerRecord<>("hello", "key-" + i, "value-" + i);
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.printf("topic=%s, partition=%d, offset=%d%n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        }
    });
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment