Implementation:Fede1024 Rust rdkafka Producer Send Offsets To Transaction
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Transactions |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete transactional offset commit method provided by rust-rdkafka's Producer trait.
Description
Producer::send_offsets_to_transaction sends a set of consumer offsets to the transaction coordinator as part of the current transaction. When the transaction commits, these offsets are atomically committed to the consumer group. The method requires a TopicPartitionList with the offsets to commit (typically consumed offset + 1) and a ConsumerGroupMetadata obtained from Consumer::group_metadata().
Under the hood, it calls rd_kafka_send_offsets_to_transaction via FFI.
Usage
Call within an active transaction, after producing output messages and before commit_transaction. Build the offsets list from consumed message positions.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/producer/base_producer.rs
- Lines: L572-591
Signature
fn send_offsets_to_transaction<T: Into<Timeout>>(
&self,
offsets: &TopicPartitionList,
cgm: &ConsumerGroupMetadata,
timeout: T,
) -> KafkaResult<()>
Import
use rdkafka::producer::Producer;
use rdkafka::consumer::Consumer;
use rdkafka::topic_partition_list::TopicPartitionList;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| offsets | &TopicPartitionList | Yes | Consumer offsets to commit (typically consumed offset + 1) |
| cgm | &ConsumerGroupMetadata | Yes | From Consumer::group_metadata() |
| timeout | T: Into<Timeout> | Yes | Maximum wait time |
Outputs
| Name | Type | Description |
|---|---|---|
| returns | KafkaResult<()> | Success or KafkaError::Transaction |
Usage Examples
Send Offsets in Transaction
use rdkafka::producer::Producer;
use rdkafka::consumer::Consumer;
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::Offset;
use std::time::Duration;
let cgm = consumer.group_metadata().expect("No group metadata");
let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset("input-topic", 0, Offset::Offset(msg.offset() + 1))
.expect("Failed to add offset");
producer
.send_offsets_to_transaction(&offsets, &cgm, Duration::from_secs(10))
.expect("Send offsets failed");