Implementation:Fede1024 Rust rdkafka Producer Transaction Methods
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Transactions |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete transaction lifecycle methods on the Producer trait provided by rust-rdkafka.
Description
The Producer trait defines four transaction lifecycle methods, all implemented by BaseProducer via FFI calls to librdkafka:
- init_transactions — Registers with the transaction coordinator, fences previous producers with the same transactional.id. Must be called once before any transactions.
- begin_transaction — Opens a new transaction scope.
- commit_transaction — Atomically commits all produced messages and offset stores in the current transaction. Calls flush() first to ensure all messages are delivered.
- abort_transaction — Rolls back the current transaction, discarding all uncommitted messages.
Usage
Call init_transactions once after producer creation. Then use begin_transaction and commit_transaction (or abort_transaction) around each batch of operations.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/producer/mod.rs (trait definitions), src/producer/base_producer.rs (implementations)
- Lines: mod.rs:L337-442 (trait), base_producer.rs:L548-626 (impl)
Signature
pub trait Producer<C, Part> {
fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
fn begin_transaction(&self) -> KafkaResult<()>;
fn send_offsets_to_transaction<T: Into<Timeout>>(
&self,
offsets: &TopicPartitionList,
cgm: &ConsumerGroupMetadata,
timeout: T,
) -> KafkaResult<()>;
fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
}
Import
use rdkafka::producer::Producer;
use rdkafka::consumer::Consumer; // for group_metadata()
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::groups::ConsumerGroupMetadata;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| timeout | T: Into<Timeout> | Yes | Maximum wait time for init/commit/abort |
| offsets | &TopicPartitionList | Yes (for send_offsets) | Consumer offsets to commit atomically |
| cgm | &ConsumerGroupMetadata | Yes (for send_offsets) | Consumer group metadata from Consumer::group_metadata() |
Outputs
| Name | Type | Description |
|---|---|---|
| All methods return | KafkaResult<()> | Success or KafkaError::Transaction(RDKafkaError) |
Usage Examples
Complete Transaction Lifecycle
use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use rdkafka::consumer::{BaseConsumer, Consumer};
use std::time::Duration;
// After init_transactions (done once)
producer.begin_transaction().expect("begin failed");
// Produce messages within transaction
producer
.send(BaseRecord::to("output-topic").payload("data").key("key"))
.expect("send failed");
// Commit consumer offsets atomically with produced messages
let cgm = consumer.group_metadata().unwrap();
let mut offsets = TopicPartitionList::new();
offsets.add_partition_offset("input-topic", 0, rdkafka::Offset::Offset(42))
.expect("offset add failed");
producer
.send_offsets_to_transaction(&offsets, &cgm, Duration::from_secs(10))
.expect("send offsets failed");
// Commit everything atomically
match producer.commit_transaction(Duration::from_secs(30)) {
Ok(()) => println!("Transaction committed"),
Err(e) => {
eprintln!("Commit failed: {}, aborting", e);
producer.abort_transaction(Duration::from_secs(10)).ok();
}
}
Related Pages
Implements Principle
Requires Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment