Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes KafkaConsumer Constructor

From Leeroopedia


Knowledge Sources
Domains Messaging, Distributed_Systems
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for configuring, instantiating, and subscribing a Kafka consumer to topics provided by the org.apache.kafka.clients.consumer library.

Description

The KafkaConsumer constructor accepts a Properties object containing configuration for deserialization, group coordination, and offset management. After construction, the subscribe() method registers the consumer with one or more topics, triggering the group coordination protocol that assigns partitions to this consumer.

The consumer is not thread-safe. Each thread that consumes must have its own KafkaConsumer instance. However, multiple consumers sharing the same group.id will automatically coordinate partition assignments across threads or processes.

Usage

Use this constructor at the start of any consumer application or thread. First configure the properties, then create the consumer instance, then subscribe to the desired topics. The consumer is now ready to enter the poll loop.

Code Reference

Source Location

code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java:L16-31

Signature

public KafkaConsumer(Properties properties)

public void subscribe(Collection<String> topics)

Import

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
import java.util.Properties;

I/O Contract

Direction Type Description
Input Properties A Properties object containing at minimum: bootstrap.servers, group.id, key.deserializer, value.deserializer. Optionally includes enable.auto.commit, auto.commit.interval.ms, auto.offset.reset, and other consumer configuration keys.
Input (subscribe) Collection<String> A list of topic names to subscribe to.
Output KafkaConsumer<String, String> A consumer instance joined to the specified consumer group and subscribed to the given topics, ready for polling.
Throws ConfigException Thrown if required configuration properties are missing or have invalid values.

Usage Examples

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
import java.util.Properties;

// 1. Build the configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");

// 2. Create the consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 3. Subscribe to topic(s)
consumer.subscribe(Collections.singletonList("hello"));

// ... enter poll loop ...

// 4. Close the consumer on shutdown
consumer.close();

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment