Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka StreamConsumer Recv

From Leeroopedia


Knowledge Sources
Domains Messaging, Async_Programming
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete async message consumption tool using Consumer::subscribe, StreamConsumer::recv, and Consumer::commit_message provided by rust-rdkafka.

Description

Consumer::subscribe registers one or more topics with the Kafka consumer group coordinator. It creates a TopicPartitionList internally and calls rd_kafka_subscribe.

StreamConsumer::recv is an async method that awaits the next message from any assigned partition. It creates a single-use MessageStream internally and calls next() on it. The method is cancellation safe.

Consumer::commit_message commits the offset of a specific message to the consumer group, either synchronously or asynchronously. This tells the broker that all messages up to and including this one have been processed.

Usage

Use these methods together for the basic consume-process-commit loop. Call subscribe once, then loop on recv and optionally commit_message after processing each message.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/consumer/stream_consumer.rs (recv, subscribe delegation), src/consumer/base_consumer.rs (subscribe impl, commit_message impl)
  • Lines: stream_consumer.rs:L314-319 (recv), stream_consumer.rs:L386-388 (subscribe), base_consumer.rs:L424-435 (subscribe impl), base_consumer.rs:L559-568 (commit_message impl)

Signature

// Consumer trait (implemented by StreamConsumer)
pub trait Consumer<C: ConsumerContext> {
    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;
    fn commit_message(
        &self,
        message: &BorrowedMessage<'_>,
        mode: CommitMode,
    ) -> KafkaResult<()>;
}

// StreamConsumer specific
impl<C, R> StreamConsumer<C, R>
where
    C: ConsumerContext + 'static,
{
    pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError>;
}

Import

use rdkafka::consumer::{Consumer, StreamConsumer, CommitMode};
use rdkafka::message::BorrowedMessage;

I/O Contract

Inputs

Name Type Required Description
topics &[&str] Yes Topic names to subscribe to
message &BorrowedMessage<'_> Yes (for commit) Message whose offset to commit
mode CommitMode Yes (for commit) Sync or Async commit mode

Outputs

Name Type Description
subscribe() returns KafkaResult<()> Success or subscription error
recv() returns Result<BorrowedMessage<'_>, KafkaError> Next message or error
commit_message() returns KafkaResult<()> Success or commit error

Usage Examples

Basic Subscribe and Consume

use rdkafka::consumer::{Consumer, StreamConsumer, CommitMode};
use rdkafka::Message;

// Subscribe to topics
consumer.subscribe(&["my-topic"])
    .expect("Subscription failed");

// Consume messages in a loop
loop {
    match consumer.recv().await {
        Ok(msg) => {
            if let Some(payload) = msg.payload_view::<str>() {
                match payload {
                    Ok(text) => println!("Received: {}", text),
                    Err(e) => eprintln!("Deserialization error: {}", e),
                }
            }
            consumer.commit_message(&msg, CommitMode::Async)
                .expect("Commit failed");
        }
        Err(e) => eprintln!("Kafka error: {}", e),
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment