Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka Producer Transaction Methods

From Leeroopedia


Knowledge Sources
Domains Messaging, Transactions
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete transaction lifecycle methods on the Producer trait provided by rust-rdkafka.

Description

The Producer trait defines four transaction lifecycle methods, all implemented by BaseProducer via FFI calls to librdkafka:

  • init_transactions — Registers with the transaction coordinator, fences previous producers with the same transactional.id. Must be called once before any transactions.
  • begin_transaction — Opens a new transaction scope.
  • commit_transaction — Atomically commits all produced messages and offset stores in the current transaction. Calls flush() first to ensure all messages are delivered.
  • abort_transaction — Rolls back the current transaction, discarding all uncommitted messages.

Usage

Call init_transactions once after producer creation. Then use begin_transaction and commit_transaction (or abort_transaction) around each batch of operations.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/producer/mod.rs (trait definitions), src/producer/base_producer.rs (implementations)
  • Lines: mod.rs:L337-442 (trait), base_producer.rs:L548-626 (impl)

Signature

pub trait Producer<C, Part> {
    fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
    fn begin_transaction(&self) -> KafkaResult<()>;
    fn send_offsets_to_transaction<T: Into<Timeout>>(
        &self,
        offsets: &TopicPartitionList,
        cgm: &ConsumerGroupMetadata,
        timeout: T,
    ) -> KafkaResult<()>;
    fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
    fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
}

Import

use rdkafka::producer::Producer;
use rdkafka::consumer::Consumer;  // for group_metadata()
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::groups::ConsumerGroupMetadata;

I/O Contract

Inputs

Name Type Required Description
timeout T: Into<Timeout> Yes Maximum wait time for init/commit/abort
offsets &TopicPartitionList Yes (for send_offsets) Consumer offsets to commit atomically
cgm &ConsumerGroupMetadata Yes (for send_offsets) Consumer group metadata from Consumer::group_metadata()

Outputs

Name Type Description
All methods return KafkaResult<()> Success or KafkaError::Transaction(RDKafkaError)

Usage Examples

Complete Transaction Lifecycle

use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use rdkafka::consumer::{BaseConsumer, Consumer};
use std::time::Duration;

// After init_transactions (done once)
producer.begin_transaction().expect("begin failed");

// Produce messages within transaction
producer
    .send(BaseRecord::to("output-topic").payload("data").key("key"))
    .expect("send failed");

// Commit consumer offsets atomically with produced messages
let cgm = consumer.group_metadata().unwrap();
let mut offsets = TopicPartitionList::new();
offsets.add_partition_offset("input-topic", 0, rdkafka::Offset::Offset(42))
    .expect("offset add failed");

producer
    .send_offsets_to_transaction(&offsets, &cgm, Duration::from_secs(10))
    .expect("send offsets failed");

// Commit everything atomically
match producer.commit_transaction(Duration::from_secs(30)) {
    Ok(()) => println!("Transaction committed"),
    Err(e) => {
        eprintln!("Commit failed: {}, aborting", e);
        producer.abort_transaction(Duration::from_secs(10)).ok();
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment