Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Fede1024 Rust rdkafka Transactional Produce Consume

From Leeroopedia


Knowledge Sources
Domains Messaging, Kafka, Transactions, Exactly_Once_Semantics
Last Updated 2026-02-07 19:30 GMT

Overview

End-to-end process for implementing exactly-once semantics (EOS) in a consume-transform-produce pipeline using Kafka transactions with BaseProducer and BaseConsumer.

Description

This workflow implements Kafka's transactional messaging protocol to achieve exactly-once processing guarantees. A transactional producer atomically commits both the produced output messages and the consumed input offsets in a single transaction. If any part of the processing fails, the entire transaction can be aborted, leaving both the output topic and the consumer offsets unchanged. This eliminates the duplicate-on-failure problem of at-least-once processing.

Key outcomes:

  • Exactly-once processing via Kafka transactions (idempotent producer + transactional offset commits)
  • Atomic commit of produced messages and consumed offsets
  • Ability to abort transactions on failure, rolling back all changes

Usage

Execute this workflow when your application requires exactly-once processing guarantees and cannot tolerate duplicate messages. This is appropriate for financial transactions, inventory management, or any system where duplicate processing has unacceptable side effects. Requires Kafka brokers configured with transaction support and sufficient replication.

Execution Steps

Step 1: Configure Idempotent Transactional Producer

Build a ClientConfig for a BaseProducer with enable.idempotence set to true and a unique transactional.id assigned. Idempotence ensures the producer does not create duplicates even on retry. The transactional ID uniquely identifies this producer instance across restarts and enables the transaction coordinator to fence zombie instances.

Key considerations:

  • enable.idempotence must be true for transactional producers
  • transactional.id must be unique per logical producer instance
  • Optionally enable debug with eos for transaction-related debug logging
  • Use BaseProducer (not FutureProducer) for synchronous transaction control

Step 2: Configure Consumer

Build a ClientConfig for a BaseConsumer with standard consumer group settings. The consumer's offsets will be committed as part of the producer's transaction rather than through the consumer's own commit mechanism.

Key considerations:

  • The consumer does not need auto-commit; offsets are managed by the transaction
  • The consumer must be part of a consumer group (group.id is required)

Step 3: Initialize Transactions

Call producer.init_transactions() with a timeout. This registers the producer with the Kafka transaction coordinator, which assigns an internal producer ID and epoch. If a previous instance with the same transactional.id exists, it is fenced (prevented from making further progress).

Key considerations:

  • init_transactions must be called exactly once after producer creation, before any transactional operations
  • This call may block while communicating with the transaction coordinator
  • Fencing ensures that only one instance with a given transactional.id is active at any time

Step 4: Begin Transaction

Call producer.begin_transaction() to start a new transaction. All subsequent produce operations and offset sends will be part of this transaction until it is committed or aborted.

Key considerations:

  • Only one transaction can be active per producer at a time
  • The transaction state is maintained by the Kafka transaction coordinator

Step 5: Produce Messages and Send Offsets

Within the transaction, call producer.send() to produce messages to the output topic. Also call producer.send_offsets_to_transaction() to atomically include the consumer's offset advancement in the transaction. The consumer's group_metadata() is passed to associate the offsets with the correct consumer group.

Key considerations:

  • send_offsets_to_transaction takes a TopicPartitionList with the offsets to commit and the consumer's group_metadata
  • Both produced messages and offset commits are staged but not visible until the transaction commits
  • Call producer.flush() before commit/abort to ensure all buffered messages are delivered to the broker

Step 6: Commit or Abort Transaction

Call producer.commit_transaction() to atomically make all produced messages visible and commit the consumer offsets. If any error occurred during processing, call producer.abort_transaction() instead to discard all produced messages and leave consumer offsets unchanged. After commit or abort, a new transaction can begin.

Key considerations:

  • commit_transaction makes all changes atomic: either all produced messages and offset commits succeed, or none do
  • abort_transaction discards all pending messages; they are visible in read_uncommitted isolation but not read_committed
  • After abort, the consumer will re-read messages from the last committed offset on restart
  • Downstream consumers should use isolation.level=read_committed to only see committed transaction data

Execution Diagram

GitHub URL

Workflow Repository