Implementation:Heibaiying BigData Notes KafkaConsumer Constructor
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Distributed_Systems |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for configuring, instantiating, and subscribing a Kafka consumer to topics provided by the org.apache.kafka.clients.consumer library.
Description
The KafkaConsumer constructor accepts a Properties object containing configuration for deserialization, group coordination, and offset management. After construction, the subscribe() method registers the consumer with one or more topics, triggering the group coordination protocol that assigns partitions to this consumer.
The consumer is not thread-safe. Each thread that consumes must have its own KafkaConsumer instance. However, multiple consumers sharing the same group.id will automatically coordinate partition assignments across threads or processes.
Usage
Use this constructor at the start of any consumer application or thread. First configure the properties, then create the consumer instance, then subscribe to the desired topics. The consumer is now ready to enter the poll loop.
Code Reference
Source Location
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java:L16-31
Signature
public KafkaConsumer(Properties properties)
public void subscribe(Collection<String> topics)
Import
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
import java.util.Properties;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | Properties | A Properties object containing at minimum: bootstrap.servers, group.id, key.deserializer, value.deserializer. Optionally includes enable.auto.commit, auto.commit.interval.ms, auto.offset.reset, and other consumer configuration keys. |
| Input (subscribe) | Collection<String> | A list of topic names to subscribe to. |
| Output | KafkaConsumer<String, String> | A consumer instance joined to the specified consumer group and subscribed to the given topics, ready for polling. |
| Throws | ConfigException | Thrown if required configuration properties are missing or have invalid values. |
Usage Examples
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
import java.util.Properties;
// 1. Build the configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 2. Create the consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. Subscribe to topic(s)
consumer.subscribe(Collections.singletonList("hello"));
// ... enter poll loop ...
// 4. Close the consumer on shutdown
consumer.close();