Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes ConsumerRebalanceListener Usage

From Leeroopedia


Knowledge Sources
Domains Messaging, Distributed_Systems
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete pattern for handling partition rebalancing via ConsumerRebalanceListener and graceful consumer shutdown via consumer.wakeup() provided by the org.apache.kafka.clients.consumer library.

Description

This implementation covers two related patterns:

Rebalance handling: A class implementing ConsumerRebalanceListener is passed to consumer.subscribe(). The onPartitionsRevoked method commits offsets synchronously for the partitions being revoked, ensuring no progress is lost. The onPartitionsAssigned method provides a hook for initialization when new partitions are received.

Graceful shutdown: The consumer.wakeup() method is called from a shutdown hook thread. This causes the active or next poll() call to throw a WakeupException. The main consumer thread catches this exception, performs a final synchronous commit, and closes the consumer. This ensures the consumer leaves the group cleanly, triggering an immediate rebalance rather than waiting for a session timeout.

Usage

Use the rebalance listener pattern whenever the consumer manages offsets manually (enable.auto.commit=false) and processing correctness requires that offsets be committed before partition ownership changes. Use the wakeup/shutdown pattern in any long-running consumer application to ensure clean termination on process shutdown, SIGTERM, or application-level stop signals.

Code Reference

Source Location

code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/RebalanceListener.java:L10-59
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerExit.java:L18-68

Signature

// ConsumerRebalanceListener interface
public interface ConsumerRebalanceListener {
    void onPartitionsRevoked(Collection<TopicPartition> partitions);
    void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

// Subscribe with rebalance listener
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

// Wakeup for graceful shutdown (thread-safe)
public void wakeup()

Import

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Collection;
import java.util.Map;
import java.util.HashMap;

I/O Contract

Direction Type Description
Input (onPartitionsRevoked) Collection<TopicPartition> The set of partitions that are being taken away from this consumer. The listener should commit offsets for these partitions.
Input (onPartitionsAssigned) Collection<TopicPartition> The set of partitions newly assigned to this consumer. The listener may initialize per-partition state or seek to specific offsets.
Output (wakeup) WakeupException Causes the current or next poll() invocation to throw a WakeupException, signaling the consumer to exit its poll loop.
Side Effect LeaveGroup Calling consumer.close() after catching WakeupException sends a LeaveGroup request, triggering an immediate rebalance in the consumer group.

Usage Examples

Rebalance Listener

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

// Track current offsets during processing
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Commit offsets for revoked partitions before they are reassigned
        consumer.commitSync(currentOffsets);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Optionally initialize state for newly assigned partitions
    }
};

// Subscribe with the rebalance listener
consumer.subscribe(Collections.singletonList("hello"), listener);

// Poll loop that tracks offsets
while (true) {
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        // Process the record
        System.out.printf("topic=%s, partition=%d, offset=%d%n",
                record.topic(), record.partition(), record.offset());
        // Track the offset (next offset to read = current + 1)
        currentOffsets.put(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
    }
    consumer.commitAsync();
}

Graceful Shutdown with Wakeup

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

// Register a shutdown hook that triggers wakeup
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.wakeup();
}));

consumer.subscribe(Collections.singletonList("hello"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
                consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            // Process the record
            System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());
        }
        consumer.commitAsync();
    }
} catch (WakeupException e) {
    // Expected on shutdown, ignore
} finally {
    consumer.commitSync();
    consumer.close();
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment