Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka BaseProducer Send

From Leeroopedia


Knowledge Sources
Domains Messaging, Transactions
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete synchronous message enqueue method for transactional production provided by rust-rdkafka's BaseProducer.

Description

BaseProducer::send enqueues a message for delivery via the rd_kafka_producev FFI call. It never blocks and returns immediately. The message is copied into librdkafka's internal buffer (RD_KAFKA_MSG_F_COPY flag). On success, the library takes ownership of the headers. On failure, the BaseRecord is returned to the caller for retry or error handling.

BaseRecord is the message builder with fields: topic, payload, key, partition, timestamp, headers, and delivery_opaque.

Usage

Use within a transaction context after begin_transaction(). Call flush() or commit_transaction() to ensure delivery.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/producer/base_producer.rs
  • Lines: L443-499

Signature

impl<C, Part> BaseProducer<C, Part>
where
    Part: Partitioner,
    C: ProducerContext<Part>,
{
    pub fn send<'a, K, P>(
        &self,
        record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
    ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
    where
        K: ToBytes + ?Sized,
        P: ToBytes + ?Sized;
}

Import

use rdkafka::producer::{BaseProducer, BaseRecord};

I/O Contract

Inputs

Name Type Required Description
record.topic &str Yes Destination topic
record.payload Option<&P> No Message payload
record.key Option<&K> No Message key
record.partition Option<i32> No Explicit partition
record.timestamp Option<i64> No Message timestamp
record.headers Option<OwnedHeaders> No Message headers
record.delivery_opaque C::DeliveryOpaque Yes Opaque value returned in delivery callback

Outputs

Name Type Description
returns Result<(), (KafkaError, BaseRecord)> Ok on enqueue success, Err with error and original record on failure

Usage Examples

Transactional Send

use rdkafka::producer::{BaseProducer, BaseRecord, Producer};

producer.begin_transaction().expect("begin failed");

// Send multiple messages within transaction
for i in 0..10 {
    let payload = format!("message-{}", i);
    producer
        .send(BaseRecord::to("output-topic").payload(&payload).key("key"))
        .expect("Enqueue failed");
}

// Commit to make all messages visible
producer.commit_transaction(std::time::Duration::from_secs(30))
    .expect("Commit failed");

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment