Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka Producer Send Offsets To Transaction

From Leeroopedia


Knowledge Sources
Domains Messaging, Transactions
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete transactional offset commit method provided by rust-rdkafka's Producer trait.

Description

Producer::send_offsets_to_transaction sends a set of consumer offsets to the transaction coordinator as part of the current transaction. When the transaction commits, these offsets are atomically committed to the consumer group. The method requires a TopicPartitionList with the offsets to commit (typically consumed offset + 1) and a ConsumerGroupMetadata obtained from Consumer::group_metadata().

Under the hood, it calls rd_kafka_send_offsets_to_transaction via FFI.

Usage

Call within an active transaction, after producing output messages and before commit_transaction. Build the offsets list from consumed message positions.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/producer/base_producer.rs
  • Lines: L572-591

Signature

fn send_offsets_to_transaction<T: Into<Timeout>>(
    &self,
    offsets: &TopicPartitionList,
    cgm: &ConsumerGroupMetadata,
    timeout: T,
) -> KafkaResult<()>

Import

use rdkafka::producer::Producer;
use rdkafka::consumer::Consumer;
use rdkafka::topic_partition_list::TopicPartitionList;

I/O Contract

Inputs

Name Type Required Description
offsets &TopicPartitionList Yes Consumer offsets to commit (typically consumed offset + 1)
cgm &ConsumerGroupMetadata Yes From Consumer::group_metadata()
timeout T: Into<Timeout> Yes Maximum wait time

Outputs

Name Type Description
returns KafkaResult<()> Success or KafkaError::Transaction

Usage Examples

Send Offsets in Transaction

use rdkafka::producer::Producer;
use rdkafka::consumer::Consumer;
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::Offset;
use std::time::Duration;

let cgm = consumer.group_metadata().expect("No group metadata");
let mut offsets = TopicPartitionList::new();
offsets
    .add_partition_offset("input-topic", 0, Offset::Offset(msg.offset() + 1))
    .expect("Failed to add offset");

producer
    .send_offsets_to_transaction(&offsets, &cgm, Duration::from_secs(10))
    .expect("Send offsets failed");

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment