Implementation:Fede1024 Rust rdkafka Message Trait Methods
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Data_Access |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete message access methods via the Message trait provided by rust-rdkafka.
Description
The Message trait provides read-only access to all Kafka message fields. Key methods:
- payload_view::<P: FromBytes>() — Zero-copy typed access to payload. Returns Option<Result<&P, P::Error>>.
- key() — Returns raw key bytes as Option<&[u8]>.
- timestamp() — Returns Timestamp enum: NotAvailable, CreateTime(i64), or LogAppendTime(i64).
- topic() — Returns topic name as &str.
- partition() — Returns partition number as i32.
- offset() — Returns message offset as i64.
- headers() — Returns message headers if present.
Usage
Import the Message trait and call these methods on any BorrowedMessage or OwnedMessage.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/message.rs
- Lines: L180-228 (Message trait), L424-489 (BorrowedMessage impl)
Signature
pub trait Message {
type Headers: Headers;
fn key(&self) -> Option<&[u8]>;
fn payload(&self) -> Option<&[u8]>;
fn topic(&self) -> &str;
fn partition(&self) -> i32;
fn offset(&self) -> i64;
fn timestamp(&self) -> Timestamp;
fn payload_view<P: ?Sized + FromBytes>(&self) -> Option<Result<&P, P::Error>>;
fn key_view<K: ?Sized + FromBytes>(&self) -> Option<Result<&K, K::Error>>;
fn headers(&self) -> Option<&Self::Headers>;
}
Import
use rdkafka::Message; // trait
use rdkafka::message::{BorrowedMessage, OwnedMessage, Timestamp};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| &self | &impl Message | Yes | Any message implementing the Message trait |
| P (type param) | FromBytes | No | Target type for payload_view (e.g., str, [u8]) |
Outputs
| Name | Type | Description |
|---|---|---|
| payload_view() | Option<Result<&P, P::Error>> | Typed payload or None/error |
| key() | Option<&[u8]> | Raw key bytes or None |
| timestamp() | Timestamp | NotAvailable, CreateTime(i64), or LogAppendTime(i64) |
| topic() | &str | Source topic name |
| partition() | i32 | Partition number |
| offset() | i64 | Message offset |
| headers() | Option<&Headers> | Message headers or None |
Usage Examples
Inspect Message Fields
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use rdkafka::message::Timestamp;
let msg = consumer.recv().await.expect("recv failed");
// Typed payload access
if let Some(Ok(text)) = msg.payload_view::<str>() {
println!("Payload: {}", text);
}
// Key access
if let Some(key) = msg.key() {
println!("Key: {:?}", String::from_utf8_lossy(key));
}
// Metadata
println!("Topic: {}, Partition: {}, Offset: {}",
msg.topic(), msg.partition(), msg.offset());
// Timestamp for latency measurement
match msg.timestamp() {
Timestamp::CreateTime(ts) => {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
println!("Latency: {} ms", now - ts);
}
_ => println!("No create timestamp"),
}
Related Pages
Implements Principle
Requires Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment