Implementation:Fede1024 Rust rdkafka StreamConsumer Stream
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Stream_Processing |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete stream-based Kafka consumption tool using StreamConsumer::stream() provided by rust-rdkafka.
Description
StreamConsumer::stream() returns a MessageStream that implements futures::Stream<Item = KafkaResult<BorrowedMessage<'a>>>. The MessageStream uses a WakerSlab internally to register async wakers that are triggered by librdkafka's queue-nonempty callback, providing efficient wake-on-message semantics.
Multiple streams from the same consumer are legal and can be moved across threads, but they share the underlying state. A received message is delivered to only one stream.
Usage
Call stream() on a StreamConsumer after subscribing to topics. Use StreamExt combinators from the futures crate to process the stream.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/consumer/stream_consumer.rs
- Lines: L91-96 (MessageStream struct), L135-173 (Stream impl), L290-292 (stream() method)
Signature
pub struct MessageStream<'a, C: ConsumerContext> {
wakers: &'a WakerSlab,
consumer: &'a BaseConsumer<C>,
partition_queue: Option<&'a NativeQueue>,
slot: usize,
}
impl<C, R> StreamConsumer<C, R>
where
C: ConsumerContext + 'static,
{
pub fn stream(&self) -> MessageStream<'_, C>;
}
impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> {
type Item = KafkaResult<BorrowedMessage<'a>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
Import
use rdkafka::consumer::StreamConsumer;
use futures::stream::StreamExt;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| &self | &StreamConsumer | Yes | The consumer to create a stream from |
Outputs
| Name | Type | Description |
|---|---|---|
| stream() returns | MessageStream<'_, C> | Async Stream yielding KafkaResult<BorrowedMessage<'a>> |
Usage Examples
Stream Processing Pipeline
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use futures::stream::StreamExt;
consumer.subscribe(&["input-topic"]).expect("Subscription failed");
consumer
.stream()
.for_each(|message| async move {
match message {
Ok(msg) => {
let payload = msg.payload_view::<str>()
.unwrap_or(Ok(""))
.unwrap_or("");
println!("Processing: {}", payload);
}
Err(e) => eprintln!("Stream error: {}", e),
}
})
.await;