Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka Message Trait Methods

From Leeroopedia


Knowledge Sources
Domains Messaging, Data_Access
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete message access methods via the Message trait provided by rust-rdkafka.

Description

The Message trait provides read-only access to all Kafka message fields. Key methods:

  • payload_view::<P: FromBytes>() — Zero-copy typed access to payload. Returns Option<Result<&P, P::Error>>.
  • key() — Returns raw key bytes as Option<&[u8]>.
  • timestamp() — Returns Timestamp enum: NotAvailable, CreateTime(i64), or LogAppendTime(i64).
  • topic() — Returns topic name as &str.
  • partition() — Returns partition number as i32.
  • offset() — Returns message offset as i64.
  • headers() — Returns message headers if present.

Usage

Import the Message trait and call these methods on any BorrowedMessage or OwnedMessage.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/message.rs
  • Lines: L180-228 (Message trait), L424-489 (BorrowedMessage impl)

Signature

pub trait Message {
    type Headers: Headers;

    fn key(&self) -> Option<&[u8]>;
    fn payload(&self) -> Option<&[u8]>;
    fn topic(&self) -> &str;
    fn partition(&self) -> i32;
    fn offset(&self) -> i64;
    fn timestamp(&self) -> Timestamp;
    fn payload_view<P: ?Sized + FromBytes>(&self) -> Option<Result<&P, P::Error>>;
    fn key_view<K: ?Sized + FromBytes>(&self) -> Option<Result<&K, K::Error>>;
    fn headers(&self) -> Option<&Self::Headers>;
}

Import

use rdkafka::Message;  // trait
use rdkafka::message::{BorrowedMessage, OwnedMessage, Timestamp};

I/O Contract

Inputs

Name Type Required Description
&self &impl Message Yes Any message implementing the Message trait
P (type param) FromBytes No Target type for payload_view (e.g., str, [u8])

Outputs

Name Type Description
payload_view() Option<Result<&P, P::Error>> Typed payload or None/error
key() Option<&[u8]> Raw key bytes or None
timestamp() Timestamp NotAvailable, CreateTime(i64), or LogAppendTime(i64)
topic() &str Source topic name
partition() i32 Partition number
offset() i64 Message offset
headers() Option<&Headers> Message headers or None

Usage Examples

Inspect Message Fields

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use rdkafka::message::Timestamp;

let msg = consumer.recv().await.expect("recv failed");

// Typed payload access
if let Some(Ok(text)) = msg.payload_view::<str>() {
    println!("Payload: {}", text);
}

// Key access
if let Some(key) = msg.key() {
    println!("Key: {:?}", String::from_utf8_lossy(key));
}

// Metadata
println!("Topic: {}, Partition: {}, Offset: {}",
    msg.topic(), msg.partition(), msg.offset());

// Timestamp for latency measurement
match msg.timestamp() {
    Timestamp::CreateTime(ts) => {
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_millis() as i64;
        println!("Latency: {} ms", now - ts);
    }
    _ => println!("No create timestamp"),
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment