Implementation:Heibaiying BigData Notes KafkaConsumer Offset Commit
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Distributed_Systems |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for committing consumer offsets synchronously, asynchronously, and on a per-partition basis provided by the org.apache.kafka.clients.consumer library.
Description
The KafkaConsumer provides multiple methods for committing offsets:
- commitSync(): Commits offsets for all assigned partitions. Blocks until the broker confirms the commit or an unrecoverable error occurs. Retries automatically on retriable errors.
- commitAsync(): Commits offsets for all assigned partitions without blocking. Optionally accepts an OffsetCommitCallback that is invoked when the commit completes or fails. Does not retry on failure (because a retry might commit a stale offset).
- commitAsync(Map<TopicPartition, OffsetAndMetadata>, OffsetCommitCallback): Commits offsets for specific partitions. This allows fine-grained control, such as committing after processing each partition's records separately.
All commit methods require enable.auto.commit to be set to false in the consumer configuration. If auto-commit is enabled, manual commit calls are ignored.
Usage
Use commitSync() when you need guaranteed offset persistence, such as during shutdown or after processing a critical batch. Use commitAsync() during normal poll loop operation for higher throughput. Use the per-partition variant when processing partitions at different rates or when implementing batch-level commit granularity.
Code Reference
Source Location
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerSyn.java:L15-44
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASyn.java:L15-53
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynAndSyn.java:L15-50
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynWithOffsets.java:L19-55
Signature
// Synchronous commit (all partitions)
public void commitSync()
// Asynchronous commit (all partitions, no callback)
public void commitAsync()
// Asynchronous commit (all partitions, with callback)
public void commitAsync(OffsetCommitCallback callback)
// Asynchronous commit (specific partitions, with callback)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback)
Import
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.HashMap;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input (sync) | none | Commits offsets for all currently assigned partitions based on the last poll results. |
| Input (async) | OffsetCommitCallback | Optional callback invoked with offsets and exception on commit completion or failure. |
| Input (per-partition) | Map<TopicPartition, OffsetAndMetadata> | A map specifying the exact offset to commit for each partition. The offset should be the last processed offset plus one. |
| Output | void | Offsets are persisted to the __consumer_offsets topic on the broker. |
| Throws (sync) | CommitFailedException | Thrown if the commit fails and cannot be retried (e.g., rebalance occurred). |
Usage Examples
Synchronous Commit
// Commit offsets after processing each batch
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%d, offset=%d%n",
record.topic(), record.partition(), record.offset());
}
consumer.commitSync();
}
Asynchronous Commit with Callback
// Non-blocking commit with error logging
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%d, offset=%d%n",
record.topic(), record.partition(), record.offset());
}
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed for offsets: " + offsets);
exception.printStackTrace();
}
});
}
Combined Async and Sync
// Async during normal operation, sync on shutdown
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
// process record
}
consumer.commitAsync();
}
} finally {
consumer.commitSync();
consumer.close();
}
Per-Partition Commit
// Commit offsets for each partition individually
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// process record
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitAsync(
Collections.singletonMap(partition,
new OffsetAndMetadata(lastOffset + 1)),
(offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + offsets);
}
});
}
}
Related Pages
Implements Principle
Requires Environment
- Environment:Heibaiying_BigData_Notes_Java_8_Maven_Environment
- Environment:Heibaiying_BigData_Notes_Kafka_2_2_Environment