Heuristic:Fede1024 Rust rdkafka Cooperative Rebalance Protocol
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Optimization |
| Last Updated | 2026-02-07 19:30 GMT |
Overview
The default rebalance callback automatically detects whether cooperative or eager rebalancing is configured and calls the correct librdkafka API (`incremental_assign` vs `assign`), but custom rebalance callbacks must handle both protocols correctly.
Description
Kafka supports two consumer rebalance protocols: eager (stop-the-world: revoke all partitions, then reassign) and cooperative (incremental: only revoke/assign changed partitions). The rust-rdkafka library's default `ConsumerContext::rebalance` implementation auto-detects the active protocol via `native_client.rebalance_protocol()` and calls the appropriate FFI function. However, if you override the rebalance callback, you must replicate this protocol detection, or you risk breaking the consumer group coordination.
Usage
Use this heuristic when implementing a custom ConsumerContext with rebalance callbacks. If you use cooperative rebalancing (`partition.assignment.strategy=cooperative-sticky`), your custom callback must use `rd_kafka_incremental_assign` / `rd_kafka_incremental_unassign` instead of the eager `rd_kafka_assign`.
The Insight (Rule of Thumb)
- Action: When overriding `ConsumerContext::rebalance`, always check `native_client.rebalance_protocol()` and branch accordingly.
- Value: Cooperative rebalancing reduces stop-the-world pauses and improves availability during consumer group membership changes.
- Trade-off: Cooperative rebalancing adds complexity to custom callbacks. If you don't need custom rebalance handling, rely on the default implementation.
Reasoning
The two protocols use fundamentally different FFI calls. Eager rebalancing uses `rd_kafka_assign(ptr, tpl)` for assignment and `rd_kafka_assign(ptr, null)` for revocation (assigns empty set). Cooperative rebalancing uses `rd_kafka_incremental_assign(ptr, tpl)` and `rd_kafka_incremental_unassign(ptr, tpl)`. Calling the wrong one corrupts the consumer's partition assignment state and can lead to missed messages or duplicate processing.
Code Evidence
Protocol-aware rebalance from `src/consumer/mod.rs:74-94`:
match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
match native_client.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr());
}
_ => {
rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
}
}
}
_ => match native_client.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr());
}
_ => {
rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
}
},
}