Implementation:Fede1024 Rust rdkafka StreamConsumer Recv
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Async_Programming |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete async message consumption tool using Consumer::subscribe, StreamConsumer::recv, and Consumer::commit_message provided by rust-rdkafka.
Description
Consumer::subscribe registers one or more topics with the Kafka consumer group coordinator. It creates a TopicPartitionList internally and calls rd_kafka_subscribe.
StreamConsumer::recv is an async method that awaits the next message from any assigned partition. It creates a single-use MessageStream internally and calls next() on it. The method is cancellation safe.
Consumer::commit_message commits the offset of a specific message to the consumer group, either synchronously or asynchronously. This tells the broker that all messages up to and including this one have been processed.
Usage
Use these methods together for the basic consume-process-commit loop. Call subscribe once, then loop on recv and optionally commit_message after processing each message.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/consumer/stream_consumer.rs (recv, subscribe delegation), src/consumer/base_consumer.rs (subscribe impl, commit_message impl)
- Lines: stream_consumer.rs:L314-319 (recv), stream_consumer.rs:L386-388 (subscribe), base_consumer.rs:L424-435 (subscribe impl), base_consumer.rs:L559-568 (commit_message impl)
Signature
// Consumer trait (implemented by StreamConsumer)
pub trait Consumer<C: ConsumerContext> {
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;
fn commit_message(
&self,
message: &BorrowedMessage<'_>,
mode: CommitMode,
) -> KafkaResult<()>;
}
// StreamConsumer specific
impl<C, R> StreamConsumer<C, R>
where
C: ConsumerContext + 'static,
{
pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError>;
}
Import
use rdkafka::consumer::{Consumer, StreamConsumer, CommitMode};
use rdkafka::message::BorrowedMessage;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| topics | &[&str] | Yes | Topic names to subscribe to |
| message | &BorrowedMessage<'_> | Yes (for commit) | Message whose offset to commit |
| mode | CommitMode | Yes (for commit) | Sync or Async commit mode |
Outputs
| Name | Type | Description |
|---|---|---|
| subscribe() returns | KafkaResult<()> | Success or subscription error |
| recv() returns | Result<BorrowedMessage<'_>, KafkaError> | Next message or error |
| commit_message() returns | KafkaResult<()> | Success or commit error |
Usage Examples
Basic Subscribe and Consume
use rdkafka::consumer::{Consumer, StreamConsumer, CommitMode};
use rdkafka::Message;
// Subscribe to topics
consumer.subscribe(&["my-topic"])
.expect("Subscription failed");
// Consume messages in a loop
loop {
match consumer.recv().await {
Ok(msg) => {
if let Some(payload) = msg.payload_view::<str>() {
match payload {
Ok(text) => println!("Received: {}", text),
Err(e) => eprintln!("Deserialization error: {}", e),
}
}
consumer.commit_message(&msg, CommitMode::Async)
.expect("Commit failed");
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
Related Pages
Implements Principle
Requires Environment
- Environment:Fede1024_Rust_rdkafka_Rust_Librdkafka_Build_Environment
- Environment:Fede1024_Rust_rdkafka_Kafka_Broker_Runtime
- Heuristic:Fede1024_Rust_rdkafka_Cooperative_Rebalance_Protocol
- Heuristic:Fede1024_Rust_rdkafka_Commit_Mode_Sync_Vs_Async
- Heuristic:Fede1024_Rust_rdkafka_Main_Queue_Poll_Interval