Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Fede1024 Rust rdkafka Produce Consume Roundtrip

From Leeroopedia


Knowledge Sources
Domains Messaging, Kafka, Async_Rust
Last Updated 2026-02-07 19:30 GMT

Overview

End-to-end process for producing messages to a Kafka topic using FutureProducer and consuming them with StreamConsumer via the rust-rdkafka high-level async API.

Description

This workflow covers the canonical Kafka produce-consume cycle in Rust. It uses the ClientConfig builder pattern to create both a FutureProducer (for async message production with delivery confirmation futures) and a StreamConsumer (for receiving messages as an async stream). The producer sends messages with keys, payloads, and optional headers, then awaits delivery reports. The consumer subscribes to topics, receives messages in a loop, accesses message metadata (key, payload, headers, partition, offset, timestamp), and commits offsets.

Key outcomes:

  • A configured FutureProducer sending messages with delivery confirmation
  • A configured StreamConsumer receiving messages as an async stream
  • Custom ClientContext for logging rebalance and commit events

Usage

Execute this workflow when you need a basic Kafka integration in a Rust application: sending structured messages to a topic and reading them back with automatic consumer group management. This is the starting point for any Kafka-based Rust application using the Tokio async runtime.

Execution Steps

Step 1: Configure and Create Producer

Build a ClientConfig with broker addresses and timeout settings, then call .create() to instantiate a FutureProducer. The producer is thread-safe and can be cloned for use across async tasks. The .create() method internally invokes the FromClientConfig trait which calls rd_kafka_new via FFI to allocate the underlying C client.

Key considerations:

  • Set bootstrap.servers to one or more Kafka broker addresses
  • Set message.timeout.ms to control how long the producer retries before declaring failure
  • The producer manages an internal send queue and background polling thread

Step 2: Produce Messages

Construct FutureRecord instances with a target topic, payload, key, and optional headers (via OwnedHeaders). Call producer.send() which returns a DeliveryFuture that resolves to the delivery report (partition and offset) or an error. Multiple sends can be issued concurrently without awaiting each one individually.

Key considerations:

  • FutureRecord supports fluent builder pattern: .to(), .payload(), .key(), .headers()
  • The second argument to .send() is the queue-full retry timeout (Duration)
  • Non-blocking: collect futures and await them in batch for maximum throughput

Step 3: Implement Custom ClientContext (Optional)

Define a struct implementing ClientContext and ConsumerContext traits to intercept rebalance events (pre_rebalance, post_rebalance) and commit callbacks (commit_callback). Pass the context to create_with_context() instead of .create() when constructing the consumer.

Key considerations:

  • Rebalance callbacks fire when partitions are assigned or revoked from the consumer group
  • Commit callbacks report success or failure of offset commits
  • The context type parameterizes the consumer: StreamConsumer<CustomContext>

Step 4: Configure and Create Consumer

Build a ClientConfig with consumer-specific settings including group.id, bootstrap.servers, session timeout, auto-commit policy, and optional log level. Call .create() or .create_with_context() to produce a StreamConsumer. The consumer manages its own background thread for heartbeats and rebalancing.

Key considerations:

  • group.id is required for consumer group membership
  • enable.auto.commit controls whether offsets are committed automatically
  • enable.partition.eof can be disabled to avoid EOF errors in long-running consumers
  • Use set_log_level() for debug visibility into librdkafka internals

Step 5: Subscribe and Consume Messages

Call consumer.subscribe() with a list of topic names to join the consumer group. Enter a loop calling consumer.recv().await which returns a BorrowedMessage on success. Extract the payload, key, headers, partition, offset, and timestamp from each message. Commit offsets manually with commit_message() or rely on auto-commit.

Key considerations:

  • recv() is the async equivalent of polling; it awaits via the waker-based MessageStream
  • payload_view::<str>() attempts zero-copy deserialization of the payload
  • Headers are iterable via headers().iter() yielding key-value pairs
  • CommitMode::Async commits offsets without blocking the processing loop

Execution Diagram

GitHub URL

Workflow Repository