Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka FutureProducer Send

From Leeroopedia


Knowledge Sources
Domains Messaging, Async_Programming
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete async message production tool using FutureProducer::send and FutureRecord provided by rust-rdkafka.

Description

FutureProducer::send is an async method that enqueues a message for delivery to a Kafka topic and returns the delivery result. It wraps librdkafka's producer with a oneshot channel to bridge the C delivery callback to a Rust Future. If the internal queue is full, it retries at 100ms intervals until the queue_timeout expires.

FutureRecord is a builder struct for constructing the message to send. It follows a fluent API pattern: FutureRecord::to(topic).payload(data).key(key).

The FutureProducer has an internal polling thread (via ThreadedProducer) and can be cheaply cloned (Arc wrapper) for sharing across tasks.

Usage

Use FutureProducer::send when you need delivery confirmation and want automatic retry on queue-full conditions. Use FutureRecord::to() to build messages.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/producer/future_producer.rs
  • Lines: L59-68 (FutureRecord::to), L306-358 (FutureProducer::send)

Signature

impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
    pub fn to(topic: &'a str) -> FutureRecord<'a, K, P>
    pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P>
    pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P>
    pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P>
    pub fn timestamp(mut self, timestamp: i64) -> FutureRecord<'a, K, P>
    pub fn headers(mut self, headers: OwnedHeaders) -> FutureRecord<'a, K, P>
}

impl<C, R> FutureProducer<C, R>
where
    C: ClientContext + 'static,
    R: AsyncRuntime,
{
    pub async fn send<K, P, T>(
        &self,
        record: FutureRecord<'_, K, P>,
        queue_timeout: T,
    ) -> OwnedDeliveryResult
    where
        K: ToBytes + ?Sized,
        P: ToBytes + ?Sized,
        T: Into<Timeout>,
}

Import

use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;

I/O Contract

Inputs

Name Type Required Description
topic &str Yes Destination Kafka topic name
payload &P: ToBytes No Message payload bytes
key &K: ToBytes No Message key for partitioning
partition i32 No Explicit partition (default: hash-based)
timestamp i64 No Message timestamp
headers OwnedHeaders No Message headers
queue_timeout T: Into<Timeout> Yes How long to retry if queue is full

Outputs

Name Type Description
send() returns OwnedDeliveryResult Result<Delivery{partition, offset, timestamp}, (KafkaError, OwnedMessage)>

Usage Examples

Basic Message Production

use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

let delivery_status = producer
    .send(
        FutureRecord::to("my-topic")
            .payload("Hello Kafka!")
            .key("key-1"),
        Duration::from_secs(0),
    )
    .await;

match delivery_status {
    Ok(delivery) => println!(
        "Delivered to partition {} at offset {}",
        delivery.partition, delivery.offset
    ),
    Err((err, _msg)) => eprintln!("Delivery failed: {}", err),
}

Production with Headers

use rdkafka::message::OwnedHeaders;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

let delivery_status = producer
    .send(
        FutureRecord::to("my-topic")
            .payload("payload data")
            .key("key-1")
            .headers(
                OwnedHeaders::new()
                    .insert(rdkafka::message::Header {
                        key: "header-key",
                        value: Some("header-value"),
                    }),
            ),
        Duration::from_secs(5),
    )
    .await;

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment