Workflow:Fede1024 Rust rdkafka Produce Consume Roundtrip
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Kafka, Async_Rust |
| Last Updated | 2026-02-07 19:30 GMT |
Overview
End-to-end process for producing messages to a Kafka topic using FutureProducer and consuming them with StreamConsumer via the rust-rdkafka high-level async API.
Description
This workflow covers the canonical Kafka produce-consume cycle in Rust. It uses the ClientConfig builder pattern to create both a FutureProducer (for async message production with delivery confirmation futures) and a StreamConsumer (for receiving messages as an async stream). The producer sends messages with keys, payloads, and optional headers, then awaits delivery reports. The consumer subscribes to topics, receives messages in a loop, accesses message metadata (key, payload, headers, partition, offset, timestamp), and commits offsets.
Key outcomes:
- A configured FutureProducer sending messages with delivery confirmation
- A configured StreamConsumer receiving messages as an async stream
- Custom ClientContext for logging rebalance and commit events
Usage
Execute this workflow when you need a basic Kafka integration in a Rust application: sending structured messages to a topic and reading them back with automatic consumer group management. This is the starting point for any Kafka-based Rust application using the Tokio async runtime.
Execution Steps
Step 1: Configure and Create Producer
Build a ClientConfig with broker addresses and timeout settings, then call .create() to instantiate a FutureProducer. The producer is thread-safe and can be cloned for use across async tasks. The .create() method internally invokes the FromClientConfig trait which calls rd_kafka_new via FFI to allocate the underlying C client.
Key considerations:
- Set bootstrap.servers to one or more Kafka broker addresses
- Set message.timeout.ms to control how long the producer retries before declaring failure
- The producer manages an internal send queue and background polling thread
Step 2: Produce Messages
Construct FutureRecord instances with a target topic, payload, key, and optional headers (via OwnedHeaders). Call producer.send() which returns a DeliveryFuture that resolves to the delivery report (partition and offset) or an error. Multiple sends can be issued concurrently without awaiting each one individually.
Key considerations:
- FutureRecord supports fluent builder pattern: .to(), .payload(), .key(), .headers()
- The second argument to .send() is the queue-full retry timeout (Duration)
- Non-blocking: collect futures and await them in batch for maximum throughput
Step 3: Implement Custom ClientContext (Optional)
Define a struct implementing ClientContext and ConsumerContext traits to intercept rebalance events (pre_rebalance, post_rebalance) and commit callbacks (commit_callback). Pass the context to create_with_context() instead of .create() when constructing the consumer.
Key considerations:
- Rebalance callbacks fire when partitions are assigned or revoked from the consumer group
- Commit callbacks report success or failure of offset commits
- The context type parameterizes the consumer: StreamConsumer<CustomContext>
Step 4: Configure and Create Consumer
Build a ClientConfig with consumer-specific settings including group.id, bootstrap.servers, session timeout, auto-commit policy, and optional log level. Call .create() or .create_with_context() to produce a StreamConsumer. The consumer manages its own background thread for heartbeats and rebalancing.
Key considerations:
- group.id is required for consumer group membership
- enable.auto.commit controls whether offsets are committed automatically
- enable.partition.eof can be disabled to avoid EOF errors in long-running consumers
- Use set_log_level() for debug visibility into librdkafka internals
Step 5: Subscribe and Consume Messages
Call consumer.subscribe() with a list of topic names to join the consumer group. Enter a loop calling consumer.recv().await which returns a BorrowedMessage on success. Extract the payload, key, headers, partition, offset, and timestamp from each message. Commit offsets manually with commit_message() or rely on auto-commit.
Key considerations:
- recv() is the async equivalent of polling; it awaits via the waker-based MessageStream
- payload_view::<str>() attempts zero-copy deserialization of the payload
- Headers are iterable via headers().iter() yielding key-value pairs
- CommitMode::Async commits offsets without blocking the processing loop