Implementation:Fede1024 Rust rdkafka FutureProducer Send
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Async_Programming |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete async message production tool using FutureProducer::send and FutureRecord provided by rust-rdkafka.
Description
FutureProducer::send is an async method that enqueues a message for delivery to a Kafka topic and returns the delivery result. It wraps librdkafka's producer with a oneshot channel to bridge the C delivery callback to a Rust Future. If the internal queue is full, it retries at 100ms intervals until the queue_timeout expires.
FutureRecord is a builder struct for constructing the message to send. It follows a fluent API pattern: FutureRecord::to(topic).payload(data).key(key).
The FutureProducer has an internal polling thread (via ThreadedProducer) and can be cheaply cloned (Arc wrapper) for sharing across tasks.
Usage
Use FutureProducer::send when you need delivery confirmation and want automatic retry on queue-full conditions. Use FutureRecord::to() to build messages.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/producer/future_producer.rs
- Lines: L59-68 (FutureRecord::to), L306-358 (FutureProducer::send)
Signature
impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
pub fn to(topic: &'a str) -> FutureRecord<'a, K, P>
pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P>
pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P>
pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P>
pub fn timestamp(mut self, timestamp: i64) -> FutureRecord<'a, K, P>
pub fn headers(mut self, headers: OwnedHeaders) -> FutureRecord<'a, K, P>
}
impl<C, R> FutureProducer<C, R>
where
C: ClientContext + 'static,
R: AsyncRuntime,
{
pub async fn send<K, P, T>(
&self,
record: FutureRecord<'_, K, P>,
queue_timeout: T,
) -> OwnedDeliveryResult
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
T: Into<Timeout>,
}
Import
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| topic | &str | Yes | Destination Kafka topic name |
| payload | &P: ToBytes | No | Message payload bytes |
| key | &K: ToBytes | No | Message key for partitioning |
| partition | i32 | No | Explicit partition (default: hash-based) |
| timestamp | i64 | No | Message timestamp |
| headers | OwnedHeaders | No | Message headers |
| queue_timeout | T: Into<Timeout> | Yes | How long to retry if queue is full |
Outputs
| Name | Type | Description |
|---|---|---|
| send() returns | OwnedDeliveryResult | Result<Delivery{partition, offset, timestamp}, (KafkaError, OwnedMessage)> |
Usage Examples
Basic Message Production
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
let delivery_status = producer
.send(
FutureRecord::to("my-topic")
.payload("Hello Kafka!")
.key("key-1"),
Duration::from_secs(0),
)
.await;
match delivery_status {
Ok(delivery) => println!(
"Delivered to partition {} at offset {}",
delivery.partition, delivery.offset
),
Err((err, _msg)) => eprintln!("Delivery failed: {}", err),
}
Production with Headers
use rdkafka::message::OwnedHeaders;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
let delivery_status = producer
.send(
FutureRecord::to("my-topic")
.payload("payload data")
.key("key-1")
.headers(
OwnedHeaders::new()
.insert(rdkafka::message::Header {
key: "header-key",
value: Some("header-value"),
}),
),
Duration::from_secs(5),
)
.await;
Related Pages
Implements Principle
Requires Environment
- Environment:Fede1024_Rust_rdkafka_Rust_Librdkafka_Build_Environment
- Environment:Fede1024_Rust_rdkafka_Kafka_Broker_Runtime
- Heuristic:Fede1024_Rust_rdkafka_Regular_Polling_Required
- Heuristic:Fede1024_Rust_rdkafka_Queue_Buffering_Priority
- Heuristic:Fede1024_Rust_rdkafka_Partitioner_Must_Not_Block
- Heuristic:Fede1024_Rust_rdkafka_Producer_Flush_Before_Drop