Implementation:Fede1024 Rust rdkafka BaseProducer Send
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Transactions |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete synchronous message enqueue method for transactional production provided by rust-rdkafka's BaseProducer.
Description
BaseProducer::send enqueues a message for delivery via the rd_kafka_producev FFI call. It never blocks and returns immediately. The message is copied into librdkafka's internal buffer (RD_KAFKA_MSG_F_COPY flag). On success, the library takes ownership of the headers. On failure, the BaseRecord is returned to the caller for retry or error handling.
BaseRecord is the message builder with fields: topic, payload, key, partition, timestamp, headers, and delivery_opaque.
Usage
Use within a transaction context after begin_transaction(). Call flush() or commit_transaction() to ensure delivery.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/producer/base_producer.rs
- Lines: L443-499
Signature
impl<C, Part> BaseProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part>,
{
pub fn send<'a, K, P>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized;
}
Import
use rdkafka::producer::{BaseProducer, BaseRecord};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| record.topic | &str | Yes | Destination topic |
| record.payload | Option<&P> | No | Message payload |
| record.key | Option<&K> | No | Message key |
| record.partition | Option<i32> | No | Explicit partition |
| record.timestamp | Option<i64> | No | Message timestamp |
| record.headers | Option<OwnedHeaders> | No | Message headers |
| record.delivery_opaque | C::DeliveryOpaque | Yes | Opaque value returned in delivery callback |
Outputs
| Name | Type | Description |
|---|---|---|
| returns | Result<(), (KafkaError, BaseRecord)> | Ok on enqueue success, Err with error and original record on failure |
Usage Examples
Transactional Send
use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
producer.begin_transaction().expect("begin failed");
// Send multiple messages within transaction
for i in 0..10 {
let payload = format!("message-{}", i);
producer
.send(BaseRecord::to("output-topic").payload(&payload).key("key"))
.expect("Enqueue failed");
}
// Commit to make all messages visible
producer.commit_transaction(std::time::Duration::from_secs(30))
.expect("Commit failed");
Related Pages
Implements Principle
Requires Environment
- Environment:Fede1024_Rust_rdkafka_Rust_Librdkafka_Build_Environment
- Environment:Fede1024_Rust_rdkafka_Kafka_Broker_Runtime
- Heuristic:Fede1024_Rust_rdkafka_Regular_Polling_Required
- Heuristic:Fede1024_Rust_rdkafka_Queue_Buffering_Priority
- Heuristic:Fede1024_Rust_rdkafka_Partitioner_Must_Not_Block
- Heuristic:Fede1024_Rust_rdkafka_Producer_Flush_Before_Drop