Implementation:Fede1024 Rust rdkafka BorrowedMessage Detach
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Memory_Management |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete message detach operation for converting BorrowedMessage to OwnedMessage provided by rust-rdkafka.
Description
BorrowedMessage::detach() copies all message fields from the consumer's internal buffer into new heap allocations, producing an OwnedMessage with static lifetime. The copied fields include: payload (Option<Vec<u8>>), key (Option<Vec<u8>>), topic (String), timestamp (Timestamp), partition (i32), offset (i64), and headers (Option<OwnedHeaders>).
This is a pure Rust operation with no FFI calls. The cost is proportional to the message size (key + payload + headers).
Usage
Call detach() on a BorrowedMessage when you need to send it to another task or store it beyond the consumer buffer's lifetime.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/message.rs
- Lines: L411-422
Signature
impl<'a> BorrowedMessage<'a> {
/// Clones the content of the `BorrowedMessage` and returns an
/// `OwnedMessage` that can outlive the consumer.
///
/// This operation requires memory allocation and can be expensive.
pub fn detach(&self) -> OwnedMessage;
}
Import
use rdkafka::message::{BorrowedMessage, OwnedMessage, Message};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| &self | &BorrowedMessage<'a> | Yes | Zero-copy message tied to consumer buffer |
Outputs
| Name | Type | Description |
|---|---|---|
| detach() returns | OwnedMessage | Heap-allocated message with payload: Option<Vec<u8>>, key: Option<Vec<u8>>, topic: String, timestamp: Timestamp, partition: i32, offset: i64, headers: Option<OwnedHeaders> |
Usage Examples
Detach for Cross-Task Transfer
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
let borrowed_msg = consumer.recv().await.expect("recv failed");
let owned_msg = borrowed_msg.detach();
// owned_msg can now be sent to another task
tokio::task::spawn_blocking(move || {
let payload = owned_msg.payload().unwrap_or(&[]);
// Process payload in blocking context
expensive_computation(payload)
});