Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes KafkaConsumer Offset Commit

From Leeroopedia


Knowledge Sources
Domains Messaging, Distributed_Systems
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for committing consumer offsets synchronously, asynchronously, and on a per-partition basis provided by the org.apache.kafka.clients.consumer library.

Description

The KafkaConsumer provides multiple methods for committing offsets:

  • commitSync(): Commits offsets for all assigned partitions. Blocks until the broker confirms the commit or an unrecoverable error occurs. Retries automatically on retriable errors.
  • commitAsync(): Commits offsets for all assigned partitions without blocking. Optionally accepts an OffsetCommitCallback that is invoked when the commit completes or fails. Does not retry on failure (because a retry might commit a stale offset).
  • commitAsync(Map<TopicPartition, OffsetAndMetadata>, OffsetCommitCallback): Commits offsets for specific partitions. This allows fine-grained control, such as committing after processing each partition's records separately.

All commit methods require enable.auto.commit to be set to false in the consumer configuration. If auto-commit is enabled, manual commit calls are ignored.

Usage

Use commitSync() when you need guaranteed offset persistence, such as during shutdown or after processing a critical batch. Use commitAsync() during normal poll loop operation for higher throughput. Use the per-partition variant when processing partitions at different rates or when implementing batch-level commit granularity.

Code Reference

Source Location

code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerSyn.java:L15-44
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASyn.java:L15-53
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynAndSyn.java:L15-50
code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynWithOffsets.java:L19-55

Signature

// Synchronous commit (all partitions)
public void commitSync()

// Asynchronous commit (all partitions, no callback)
public void commitAsync()

// Asynchronous commit (all partitions, with callback)
public void commitAsync(OffsetCommitCallback callback)

// Asynchronous commit (specific partitions, with callback)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
                        OffsetCommitCallback callback)

Import

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.HashMap;

I/O Contract

Direction Type Description
Input (sync) none Commits offsets for all currently assigned partitions based on the last poll results.
Input (async) OffsetCommitCallback Optional callback invoked with offsets and exception on commit completion or failure.
Input (per-partition) Map<TopicPartition, OffsetAndMetadata> A map specifying the exact offset to commit for each partition. The offset should be the last processed offset plus one.
Output void Offsets are persisted to the __consumer_offsets topic on the broker.
Throws (sync) CommitFailedException Thrown if the commit fails and cannot be retried (e.g., rebalance occurred).

Usage Examples

Synchronous Commit

// Commit offsets after processing each batch
while (true) {
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic=%s, partition=%d, offset=%d%n",
                record.topic(), record.partition(), record.offset());
    }
    consumer.commitSync();
}

Asynchronous Commit with Callback

// Non-blocking commit with error logging
while (true) {
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic=%s, partition=%d, offset=%d%n",
                record.topic(), record.partition(), record.offset());
    }
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            System.err.println("Commit failed for offsets: " + offsets);
            exception.printStackTrace();
        }
    });
}

Combined Async and Sync

// Async during normal operation, sync on shutdown
try {
    while (true) {
        ConsumerRecords<String, String> records =
                consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            // process record
        }
        consumer.commitAsync();
    }
} finally {
    consumer.commitSync();
    consumer.close();
}

Per-Partition Commit

// Commit offsets for each partition individually
while (true) {
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> partitionRecords =
                records.records(partition);
        for (ConsumerRecord<String, String> record : partitionRecords) {
            // process record
        }
        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
        consumer.commitAsync(
                Collections.singletonMap(partition,
                        new OffsetAndMetadata(lastOffset + 1)),
                (offsets, exception) -> {
                    if (exception != null) {
                        System.err.println("Commit failed: " + offsets);
                    }
                });
    }
}

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment