Implementation:Heibaiying BigData Notes KafkaConsumer Poll
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Distributed_Systems |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for polling records from Kafka topic partitions and iterating over them provided by the org.apache.kafka.clients.consumer library.
Description
The KafkaConsumer.poll() method fetches a batch of records from the broker for the partitions assigned to this consumer. It accepts a Duration parameter that specifies the maximum time to block if no records are available. The method returns a ConsumerRecords<String, String> object that implements Iterable and provides access to individual ConsumerRecord instances.
Each ConsumerRecord exposes the topic, partition, offset, timestamp, key, and value of the consumed message. The standard processing pattern iterates over all records using an enhanced for-loop.
Usage
Use poll() inside a continuous while loop as the core of any Kafka consumer application. The timeout value should balance responsiveness (shorter values enable faster shutdown) with efficiency (longer values reduce CPU usage when the topic is idle).
Code Reference
Source Location
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java:L33-44
Signature
public ConsumerRecords<K, V> poll(Duration timeout)
Import
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | Duration | The maximum time to block waiting for records. If no records are available within this duration, an empty ConsumerRecords is returned. |
| Output | ConsumerRecords<String, String> | A batch of records fetched from the assigned partitions. May be empty if no records are available within the timeout. Each record contains topic, partition, offset, timestamp, key, and value. |
| Throws | WakeupException | Thrown if consumer.wakeup() was called from another thread, used for graceful shutdown. |
| Throws | InvalidOffsetException | Thrown if the consumer's offset is invalid and no reset policy is configured. |
Usage Examples
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
// Poll loop: continuously fetch and process records
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
}