Implementation:Heibaiying BigData Notes ConsumerRebalanceListener Usage
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Distributed_Systems |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete pattern for handling partition rebalancing via ConsumerRebalanceListener and graceful consumer shutdown via consumer.wakeup() provided by the org.apache.kafka.clients.consumer library.
Description
This implementation covers two related patterns:
Rebalance handling: A class implementing ConsumerRebalanceListener is passed to consumer.subscribe(). The onPartitionsRevoked method commits offsets synchronously for the partitions being revoked, ensuring no progress is lost. The onPartitionsAssigned method provides a hook for initialization when new partitions are received.
Graceful shutdown: The consumer.wakeup() method is called from a shutdown hook thread. This causes the active or next poll() call to throw a WakeupException. The main consumer thread catches this exception, performs a final synchronous commit, and closes the consumer. This ensures the consumer leaves the group cleanly, triggering an immediate rebalance rather than waiting for a session timeout.
Usage
Use the rebalance listener pattern whenever the consumer manages offsets manually (enable.auto.commit=false) and processing correctness requires that offsets be committed before partition ownership changes. Use the wakeup/shutdown pattern in any long-running consumer application to ensure clean termination on process shutdown, SIGTERM, or application-level stop signals.
Code Reference
Source Location
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/RebalanceListener.java:L10-59
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerExit.java:L18-68
Signature
// ConsumerRebalanceListener interface
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
// Subscribe with rebalance listener
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
// Wakeup for graceful shutdown (thread-safe)
public void wakeup()
Import
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Collection;
import java.util.Map;
import java.util.HashMap;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input (onPartitionsRevoked) | Collection<TopicPartition> | The set of partitions that are being taken away from this consumer. The listener should commit offsets for these partitions. |
| Input (onPartitionsAssigned) | Collection<TopicPartition> | The set of partitions newly assigned to this consumer. The listener may initialize per-partition state or seek to specific offsets. |
| Output (wakeup) | WakeupException | Causes the current or next poll() invocation to throw a WakeupException, signaling the consumer to exit its poll loop. |
| Side Effect | LeaveGroup | Calling consumer.close() after catching WakeupException sends a LeaveGroup request, triggering an immediate rebalance in the consumer group. |
Usage Examples
Rebalance Listener
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
// Track current offsets during processing
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit offsets for revoked partitions before they are reassigned
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Optionally initialize state for newly assigned partitions
}
};
// Subscribe with the rebalance listener
consumer.subscribe(Collections.singletonList("hello"), listener);
// Poll loop that tracks offsets
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
// Process the record
System.out.printf("topic=%s, partition=%d, offset=%d%n",
record.topic(), record.partition(), record.offset());
// Track the offset (next offset to read = current + 1)
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync();
}
Graceful Shutdown with Wakeup
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
// Register a shutdown hook that triggers wakeup
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
}));
consumer.subscribe(Collections.singletonList("hello"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
// Process the record
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}
} catch (WakeupException e) {
// Expected on shutdown, ignore
} finally {
consumer.commitSync();
consumer.close();
}