Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka StreamConsumer Stream

From Leeroopedia


Knowledge Sources
Domains Messaging, Stream_Processing
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete stream-based Kafka consumption tool using StreamConsumer::stream() provided by rust-rdkafka.

Description

StreamConsumer::stream() returns a MessageStream that implements futures::Stream<Item = KafkaResult<BorrowedMessage<'a>>>. The MessageStream uses a WakerSlab internally to register async wakers that are triggered by librdkafka's queue-nonempty callback, providing efficient wake-on-message semantics.

Multiple streams from the same consumer are legal and can be moved across threads, but they share the underlying state. A received message is delivered to only one stream.

Usage

Call stream() on a StreamConsumer after subscribing to topics. Use StreamExt combinators from the futures crate to process the stream.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/consumer/stream_consumer.rs
  • Lines: L91-96 (MessageStream struct), L135-173 (Stream impl), L290-292 (stream() method)

Signature

pub struct MessageStream<'a, C: ConsumerContext> {
    wakers: &'a WakerSlab,
    consumer: &'a BaseConsumer<C>,
    partition_queue: Option<&'a NativeQueue>,
    slot: usize,
}

impl<C, R> StreamConsumer<C, R>
where
    C: ConsumerContext + 'static,
{
    pub fn stream(&self) -> MessageStream<'_, C>;
}

impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> {
    type Item = KafkaResult<BorrowedMessage<'a>>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

Import

use rdkafka::consumer::StreamConsumer;
use futures::stream::StreamExt;

I/O Contract

Inputs

Name Type Required Description
&self &StreamConsumer Yes The consumer to create a stream from

Outputs

Name Type Description
stream() returns MessageStream<'_, C> Async Stream yielding KafkaResult<BorrowedMessage<'a>>

Usage Examples

Stream Processing Pipeline

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use futures::stream::StreamExt;

consumer.subscribe(&["input-topic"]).expect("Subscription failed");

consumer
    .stream()
    .for_each(|message| async move {
        match message {
            Ok(msg) => {
                let payload = msg.payload_view::<str>()
                    .unwrap_or(Ok(""))
                    .unwrap_or("");
                println!("Processing: {}", payload);
            }
            Err(e) => eprintln!("Stream error: {}", e),
        }
    })
    .await;

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment