Implementation:Fede1024 Rust rdkafka BaseProducer Transactional Config
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Transactions |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete transactional producer configuration using ClientConfig and BaseProducer provided by rust-rdkafka.
Description
To create a transactional producer, configure ClientConfig with enable.idempotence=true and a transactional.id, then create a BaseProducer. The BaseProducer provides synchronous send and transaction lifecycle methods required for EOS.
The FromClientConfig implementation for BaseProducer (at src/producer/base_producer.rs:L234-292) creates the native client with RD_KAFKA_PRODUCER type and sets up event handling.
Usage
Use this to create the producer for transactional workflows. After creation, call init_transactions() before starting any transactions.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/config.rs (ClientConfig), src/producer/base_producer.rs (BaseProducer FromClientConfig)
- Lines: config.rs:L232-307, base_producer.rs:L234-292
Signature
// Configuration pattern (not a single function)
// ClientConfig::new()
// .set("enable.idempotence", "true")
// .set("transactional.id", id)
// .create::<BaseProducer>()
impl FromClientConfig for BaseProducer<DefaultProducerContext, NoCustomPartitioner> {
fn from_config(config: &ClientConfig) -> KafkaResult<Self>;
}
impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
where
C: ProducerContext<Part> + 'static,
Part: Partitioner,
{
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<Self>;
}
Import
use rdkafka::config::ClientConfig;
use rdkafka::producer::BaseProducer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| enable.idempotence | "true" | Yes | Enables exactly-once producer semantics |
| transactional.id | String | Yes | Unique transaction identifier for fencing |
| bootstrap.servers | String | Yes | Kafka broker list |
Outputs
| Name | Type | Description |
|---|---|---|
| create() returns | KafkaResult<BaseProducer> | Transactional producer ready for init_transactions |
Usage Examples
Create Transactional Producer
use rdkafka::config::ClientConfig;
use rdkafka::producer::{BaseProducer, Producer};
use std::time::Duration;
let producer: BaseProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("enable.idempotence", "true")
.set("transactional.id", "my-transaction-id")
.create()
.expect("Producer creation failed");
// Initialize transactions (must call before begin_transaction)
producer
.init_transactions(Duration::from_secs(10))
.expect("Transaction initialization failed");