Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Fede1024 Rust rdkafka At Least Once Processing

From Leeroopedia



Knowledge Sources
Domains Messaging, Kafka, Stream_Processing, Delivery_Semantics
Last Updated 2026-02-07 19:30 GMT

Overview

End-to-end process for implementing at-least-once message delivery in a consume-transform-produce pipeline using StreamConsumer with manual offset storage and FutureProducer for output.

Description

This workflow implements a Kafka stream processing pattern where messages are consumed from an input topic, forwarded (or transformed) to one or more output topics, and offsets are committed only after successful delivery to all outputs. The key mechanism is separating auto-commit from auto-offset-store: auto-commit runs on a periodic timer, but offsets are only stored in the local offset store after the downstream produce is confirmed. This ensures that if the application crashes before producing, the message will be re-consumed on restart.

Key outcomes:

  • At-least-once delivery guarantee (no message loss, possible duplicates on failure)
  • Manual offset management via store_offset_from_message()
  • Fan-out pattern: a single consumed message replicated to multiple output topics

Usage

Execute this workflow when you need guaranteed message delivery in a consume-process-produce pipeline and can tolerate duplicate processing on failure. This is appropriate for applications where data loss is unacceptable but idempotent processing is feasible (e.g., updating a database with upserts, writing to append-only logs).

Execution Steps

Step 1: Configure Consumer with Manual Offset Storage

Build a ClientConfig for a StreamConsumer with auto-commit enabled but auto-offset-store disabled. This decouples the timing of offset commits (periodic, handled by librdkafka) from the decision of which offsets to commit (controlled by your application via store_offset_from_message()). Implement a ConsumerContext with a commit_callback to log commit success or failure.

Key considerations:

  • Set enable.auto.commit to true and auto.commit.interval.ms to a suitable interval (e.g., 5000ms)
  • Set enable.auto.offset.store to false to prevent offsets from being stored before processing completes
  • This combination means librdkafka periodically commits whatever offsets have been explicitly stored

Step 2: Configure Output Producer

Build a ClientConfig for a FutureProducer with queue.buffering.max.ms set to 0 to disable output batching. This ensures each produce request is sent immediately rather than being buffered, which is important for minimizing the window of possible duplicates in failure scenarios.

Key considerations:

  • Setting queue.buffering.max.ms to 0 disables producer-side batching
  • The producer is created once and reused for all output messages

Step 3: Subscribe to Input Topic

Call consumer.subscribe() with the input topic name. The consumer joins the consumer group and begins receiving partition assignments through the rebalance protocol.

Key considerations:

  • Only a single input topic is subscribed in this pattern
  • Rebalance events may cause re-processing of in-flight messages

Step 4: Consume, Produce, and Store Offset

In the main processing loop, call consumer.recv().await to receive each message. For each message, fan out to all output topics by creating FutureRecord instances with the original payload and key, then send() each via the producer. Use future::try_join_all() to await confirmation from all output topics in parallel. Only after all deliveries succeed, call consumer.store_offset_from_message() to mark the offset for eventual commit.

Key considerations:

  • try_join_all ensures all output deliveries succeed before storing the offset
  • If any delivery fails, the future returns an error and the offset is NOT stored
  • On next restart, the consumer will re-read from the last committed offset, re-processing messages
  • Messages must be processed in order for the at-least-once guarantee to hold

Execution Diagram

GitHub URL

Workflow Repository