Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka Consumer Store Offset

From Leeroopedia


Knowledge Sources
Domains Messaging, Reliability
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete offset storage method for at-least-once delivery provided by rust-rdkafka's Consumer trait.

Description

Consumer::store_offset_from_message stores the offset of a consumed message in the consumer's local offset store. This offset will be committed to the broker during the next auto-commit cycle or explicit commit call. It calls rd_kafka_offset_store via FFI using the message's topic, partition, and offset.

This method is used in conjunction with enable.auto.offset.store=false to implement manual offset management. The auto-commit mechanism (enable.auto.commit=true by default) periodically commits whatever offsets have been stored.

Usage

Call store_offset_from_message after successfully processing a message when using manual offset storage mode. Requires enable.auto.offset.store=false in the consumer configuration.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/consumer/base_consumer.rs
  • Lines: L580-589

Signature

// Consumer trait method (implemented by BaseConsumer, delegated by StreamConsumer)
fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;

Import

use rdkafka::consumer::Consumer;

I/O Contract

Inputs

Name Type Required Description
message &BorrowedMessage<'_> Yes The consumed message whose offset to store

Outputs

Name Type Description
returns KafkaResult<()> Success or StoreOffset error

Usage Examples

At-Least-Once Processing

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use std::time::Duration;

// Configure consumer with manual offset storage
let consumer: StreamConsumer = ClientConfig::new()
    .set("bootstrap.servers", "localhost:9092")
    .set("group.id", "at-least-once-group")
    .set("enable.auto.offset.store", "false")
    .create()
    .expect("Consumer creation failed");

consumer.subscribe(&["input-topic"]).expect("Subscription failed");

loop {
    match consumer.recv().await {
        Ok(msg) => {
            // Process and produce output
            let payload = msg.payload_view::<str>().unwrap_or(Ok("")).unwrap_or("");
            let result = producer
                .send(
                    FutureRecord::to("output-topic").payload(payload),
                    Duration::from_secs(0),
                )
                .await;

            if result.is_ok() {
                // Only store offset after successful production
                consumer.store_offset_from_message(&msg)
                    .expect("Offset store failed");
            }
        }
        Err(e) => eprintln!("Kafka error: {}", e),
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment