Workflow:Fede1024 Rust rdkafka At Least Once Processing
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Kafka, Stream_Processing, Delivery_Semantics |
| Last Updated | 2026-02-07 19:30 GMT |
Overview
End-to-end process for implementing at-least-once message delivery in a consume-transform-produce pipeline using StreamConsumer with manual offset storage and FutureProducer for output.
Description
This workflow implements a Kafka stream processing pattern where messages are consumed from an input topic, forwarded (or transformed) to one or more output topics, and offsets are committed only after successful delivery to all outputs. The key mechanism is separating auto-commit from auto-offset-store: auto-commit runs on a periodic timer, but offsets are only stored in the local offset store after the downstream produce is confirmed. This ensures that if the application crashes before producing, the message will be re-consumed on restart.
Key outcomes:
- At-least-once delivery guarantee (no message loss, possible duplicates on failure)
- Manual offset management via store_offset_from_message()
- Fan-out pattern: a single consumed message replicated to multiple output topics
Usage
Execute this workflow when you need guaranteed message delivery in a consume-process-produce pipeline and can tolerate duplicate processing on failure. This is appropriate for applications where data loss is unacceptable but idempotent processing is feasible (e.g., updating a database with upserts, writing to append-only logs).
Execution Steps
Step 1: Configure Consumer with Manual Offset Storage
Build a ClientConfig for a StreamConsumer with auto-commit enabled but auto-offset-store disabled. This decouples the timing of offset commits (periodic, handled by librdkafka) from the decision of which offsets to commit (controlled by your application via store_offset_from_message()). Implement a ConsumerContext with a commit_callback to log commit success or failure.
Key considerations:
- Set enable.auto.commit to true and auto.commit.interval.ms to a suitable interval (e.g., 5000ms)
- Set enable.auto.offset.store to false to prevent offsets from being stored before processing completes
- This combination means librdkafka periodically commits whatever offsets have been explicitly stored
Step 2: Configure Output Producer
Build a ClientConfig for a FutureProducer with queue.buffering.max.ms set to 0 to disable output batching. This ensures each produce request is sent immediately rather than being buffered, which is important for minimizing the window of possible duplicates in failure scenarios.
Key considerations:
- Setting queue.buffering.max.ms to 0 disables producer-side batching
- The producer is created once and reused for all output messages
Step 3: Subscribe to Input Topic
Call consumer.subscribe() with the input topic name. The consumer joins the consumer group and begins receiving partition assignments through the rebalance protocol.
Key considerations:
- Only a single input topic is subscribed in this pattern
- Rebalance events may cause re-processing of in-flight messages
Step 4: Consume, Produce, and Store Offset
In the main processing loop, call consumer.recv().await to receive each message. For each message, fan out to all output topics by creating FutureRecord instances with the original payload and key, then send() each via the producer. Use future::try_join_all() to await confirmation from all output topics in parallel. Only after all deliveries succeed, call consumer.store_offset_from_message() to mark the offset for eventual commit.
Key considerations:
- try_join_all ensures all output deliveries succeed before storing the offset
- If any delivery fails, the future returns an error and the offset is NOT stored
- On next restart, the consumer will re-read from the last committed offset, re-processing messages
- Messages must be processed in order for the at-least-once guarantee to hold