Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes KafkaConsumer Poll

From Leeroopedia


Knowledge Sources
Domains Messaging, Distributed_Systems
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for polling records from Kafka topic partitions and iterating over them provided by the org.apache.kafka.clients.consumer library.

Description

The KafkaConsumer.poll() method fetches a batch of records from the broker for the partitions assigned to this consumer. It accepts a Duration parameter that specifies the maximum time to block if no records are available. The method returns a ConsumerRecords<String, String> object that implements Iterable and provides access to individual ConsumerRecord instances.

Each ConsumerRecord exposes the topic, partition, offset, timestamp, key, and value of the consumed message. The standard processing pattern iterates over all records using an enhanced for-loop.

Usage

Use poll() inside a continuous while loop as the core of any Kafka consumer application. The timeout value should balance responsiveness (shorter values enable faster shutdown) with efficiency (longer values reduce CPU usage when the topic is idle).

Code Reference

Source Location

code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java:L33-44

Signature

public ConsumerRecords<K, V> poll(Duration timeout)

Import

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

I/O Contract

Direction Type Description
Input Duration The maximum time to block waiting for records. If no records are available within this duration, an empty ConsumerRecords is returned.
Output ConsumerRecords<String, String> A batch of records fetched from the assigned partitions. May be empty if no records are available within the timeout. Each record contains topic, partition, offset, timestamp, key, and value.
Throws WakeupException Thrown if consumer.wakeup() was called from another thread, used for graceful shutdown.
Throws InvalidOffsetException Thrown if the consumer's offset is invalid and no reset policy is configured.

Usage Examples

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

// Poll loop: continuously fetch and process records
while (true) {
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment