Implementation:Fede1024 Rust rdkafka Consumer Store Offset
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Reliability |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete offset storage method for at-least-once delivery provided by rust-rdkafka's Consumer trait.
Description
Consumer::store_offset_from_message stores the offset of a consumed message in the consumer's local offset store. This offset will be committed to the broker during the next auto-commit cycle or explicit commit call. It calls rd_kafka_offset_store via FFI using the message's topic, partition, and offset.
This method is used in conjunction with enable.auto.offset.store=false to implement manual offset management. The auto-commit mechanism (enable.auto.commit=true by default) periodically commits whatever offsets have been stored.
Usage
Call store_offset_from_message after successfully processing a message when using manual offset storage mode. Requires enable.auto.offset.store=false in the consumer configuration.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/consumer/base_consumer.rs
- Lines: L580-589
Signature
// Consumer trait method (implemented by BaseConsumer, delegated by StreamConsumer)
fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;
Import
use rdkafka::consumer::Consumer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| message | &BorrowedMessage<'_> | Yes | The consumed message whose offset to store |
Outputs
| Name | Type | Description |
|---|---|---|
| returns | KafkaResult<()> | Success or StoreOffset error |
Usage Examples
At-Least-Once Processing
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use std::time::Duration;
// Configure consumer with manual offset storage
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "at-least-once-group")
.set("enable.auto.offset.store", "false")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&["input-topic"]).expect("Subscription failed");
loop {
match consumer.recv().await {
Ok(msg) => {
// Process and produce output
let payload = msg.payload_view::<str>().unwrap_or(Ok("")).unwrap_or("");
let result = producer
.send(
FutureRecord::to("output-topic").payload(payload),
Duration::from_secs(0),
)
.await;
if result.is_ok() {
// Only store offset after successful production
consumer.store_offset_from_message(&msg)
.expect("Offset store failed");
}
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
Related Pages
Implements Principle
Requires Environment
- Environment:Fede1024_Rust_rdkafka_Rust_Librdkafka_Build_Environment
- Environment:Fede1024_Rust_rdkafka_Kafka_Broker_Runtime
- Heuristic:Fede1024_Rust_rdkafka_Manual_Offset_Store_Pattern
- Heuristic:Fede1024_Rust_rdkafka_Cooperative_Rebalance_Protocol
- Heuristic:Fede1024_Rust_rdkafka_Commit_Mode_Sync_Vs_Async