Heuristic:Fede1024 Rust rdkafka Producer Flush Before Drop
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Reliability |
| Last Updated | 2026-02-07 19:30 GMT |
Overview
The BaseProducer's `Drop` implementation purges and flushes with a 500ms timeout; for guaranteed delivery of all in-flight messages, call `flush()` explicitly with a longer timeout before dropping the producer.
Description
When a `BaseProducer` is dropped, it automatically calls `purge()` on the queue and in-flight messages, then `flush()` with a 500ms timeout to process delivery callbacks for the purged messages. This is a best-effort cleanup: if delivery callbacks take longer than 500ms, some may not execute. The `ThreadedProducer` has additional cleanup that stops its polling thread. For production applications that require guaranteed delivery confirmation of all sent messages, you should explicitly call `flush(Timeout::Never)` or `flush(Timeout::After(reasonable_duration))` before letting the producer go out of scope.
Usage
Use this heuristic when designing producer shutdown logic or debugging missing delivery callbacks on application exit. If messages appear to be "lost" during shutdown, the 500ms drop timeout is likely the cause.
The Insight (Rule of Thumb)
- Action: Call `producer.flush(Timeout::After(Duration::from_secs(30)))` explicitly before dropping the producer.
- Value: The default drop timeout is only 500ms; production workloads often need 5-30 seconds.
- Trade-off: Longer flush timeout delays application shutdown but ensures all delivery callbacks execute. The `#[must_use]` attribute on `ThreadedProducer` warns if the producer is accidentally dropped immediately.
Reasoning
The 500ms default in `Drop` is a conservative choice that balances cleanup completeness with avoiding indefinite hangs. In high-throughput scenarios, there may be thousands of in-flight messages when the producer is dropped. The purge-then-flush sequence ensures that messages are cancelled (purge) and their delivery callbacks are executed (flush), but only within the 500ms window. The `#[must_use]` annotation on `ThreadedProducer` is a Rust-level safeguard against accidentally creating and immediately dropping a producer.
Code Evidence
BaseProducer Drop implementation from `src/producer/base_producer.rs:629-642`:
impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
where
C: ProducerContext<Part>,
{
fn drop(&mut self) {
self.purge(PurgeConfig::default().queue().inflight());
// Still have to flush after purging to get the results that have been
// made ready by the purge
if let Err(err) = self.flush(Timeout::After(Duration::from_millis(500))) {
warn!(
"Failed to flush outstanding messages while dropping the producer: {:?}",
err
);
}
}
}
ThreadedProducer must_use annotation from `src/producer/base_producer.rs:655`:
#[must_use = "The threaded producer will stop immediately if unused"]
pub struct ThreadedProducer<C, Part: Partitioner = NoCustomPartitioner>