Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:Fede1024 Rust rdkafka Producer Flush Before Drop

From Leeroopedia





Knowledge Sources
Domains Messaging, Reliability
Last Updated 2026-02-07 19:30 GMT

Overview

The BaseProducer's `Drop` implementation purges and flushes with a 500ms timeout; for guaranteed delivery of all in-flight messages, call `flush()` explicitly with a longer timeout before dropping the producer.

Description

When a `BaseProducer` is dropped, it automatically calls `purge()` on the queue and in-flight messages, then `flush()` with a 500ms timeout to process delivery callbacks for the purged messages. This is a best-effort cleanup: if delivery callbacks take longer than 500ms, some may not execute. The `ThreadedProducer` has additional cleanup that stops its polling thread. For production applications that require guaranteed delivery confirmation of all sent messages, you should explicitly call `flush(Timeout::Never)` or `flush(Timeout::After(reasonable_duration))` before letting the producer go out of scope.

Usage

Use this heuristic when designing producer shutdown logic or debugging missing delivery callbacks on application exit. If messages appear to be "lost" during shutdown, the 500ms drop timeout is likely the cause.

The Insight (Rule of Thumb)

  • Action: Call `producer.flush(Timeout::After(Duration::from_secs(30)))` explicitly before dropping the producer.
  • Value: The default drop timeout is only 500ms; production workloads often need 5-30 seconds.
  • Trade-off: Longer flush timeout delays application shutdown but ensures all delivery callbacks execute. The `#[must_use]` attribute on `ThreadedProducer` warns if the producer is accidentally dropped immediately.

Reasoning

The 500ms default in `Drop` is a conservative choice that balances cleanup completeness with avoiding indefinite hangs. In high-throughput scenarios, there may be thousands of in-flight messages when the producer is dropped. The purge-then-flush sequence ensures that messages are cancelled (purge) and their delivery callbacks are executed (flush), but only within the 500ms window. The `#[must_use]` annotation on `ThreadedProducer` is a Rust-level safeguard against accidentally creating and immediately dropping a producer.

Code Evidence

BaseProducer Drop implementation from `src/producer/base_producer.rs:629-642`:

impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
where
    C: ProducerContext<Part>,
{
    fn drop(&mut self) {
        self.purge(PurgeConfig::default().queue().inflight());
        // Still have to flush after purging to get the results that have been
        // made ready by the purge
        if let Err(err) = self.flush(Timeout::After(Duration::from_millis(500))) {
            warn!(
                "Failed to flush outstanding messages while dropping the producer: {:?}",
                err
            );
        }
    }
}

ThreadedProducer must_use annotation from `src/producer/base_producer.rs:655`:

#[must_use = "The threaded producer will stop immediately if unused"]
pub struct ThreadedProducer<C, Part: Partitioner = NoCustomPartitioner>

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment