Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka BaseProducer Transactional Config

From Leeroopedia


Knowledge Sources
Domains Messaging, Transactions
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete transactional producer configuration using ClientConfig and BaseProducer provided by rust-rdkafka.

Description

To create a transactional producer, configure ClientConfig with enable.idempotence=true and a transactional.id, then create a BaseProducer. The BaseProducer provides synchronous send and transaction lifecycle methods required for EOS.

The FromClientConfig implementation for BaseProducer (at src/producer/base_producer.rs:L234-292) creates the native client with RD_KAFKA_PRODUCER type and sets up event handling.

Usage

Use this to create the producer for transactional workflows. After creation, call init_transactions() before starting any transactions.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/config.rs (ClientConfig), src/producer/base_producer.rs (BaseProducer FromClientConfig)
  • Lines: config.rs:L232-307, base_producer.rs:L234-292

Signature

// Configuration pattern (not a single function)
// ClientConfig::new()
//     .set("enable.idempotence", "true")
//     .set("transactional.id", id)
//     .create::<BaseProducer>()

impl FromClientConfig for BaseProducer<DefaultProducerContext, NoCustomPartitioner> {
    fn from_config(config: &ClientConfig) -> KafkaResult<Self>;
}

impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
where
    C: ProducerContext<Part> + 'static,
    Part: Partitioner,
{
    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<Self>;
}

Import

use rdkafka::config::ClientConfig;
use rdkafka::producer::BaseProducer;

I/O Contract

Inputs

Name Type Required Description
enable.idempotence "true" Yes Enables exactly-once producer semantics
transactional.id String Yes Unique transaction identifier for fencing
bootstrap.servers String Yes Kafka broker list

Outputs

Name Type Description
create() returns KafkaResult<BaseProducer> Transactional producer ready for init_transactions

Usage Examples

Create Transactional Producer

use rdkafka::config::ClientConfig;
use rdkafka::producer::{BaseProducer, Producer};
use std::time::Duration;

let producer: BaseProducer = ClientConfig::new()
    .set("bootstrap.servers", "localhost:9092")
    .set("enable.idempotence", "true")
    .set("transactional.id", "my-transaction-id")
    .create()
    .expect("Producer creation failed");

// Initialize transactions (must call before begin_transaction)
producer
    .init_transactions(Duration::from_secs(10))
    .expect("Transaction initialization failed");

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment