Implementation:Fede1024 Rust rdkafka ClientContext Trait
| Knowledge Sources | |
|---|---|
| Domains | Messaging, Design_Patterns |
| Last Updated | 2026-02-07 19:00 GMT |
Overview
Concrete trait definitions for injecting custom callback logic into Kafka clients provided by rust-rdkafka.
Description
ClientContext is the base trait that all Kafka clients accept. It defines callbacks for logging, statistics, errors, and OAuth token generation. All methods have default implementations, so users only need to override what they want to customize.
ConsumerContext extends ClientContext with consumer-specific callbacks: partition rebalance handling (assign/revoke), commit callbacks, and main queue polling interval configuration. The default rebalance implementation handles both eager and cooperative rebalance protocols.
Usage
Implement these traits on your own struct and pass it to ClientConfig::create_with_context(). You only need to override the methods you want to customize.
Code Reference
Source Location
- Repository: rust-rdkafka
- File: src/client.rs (ClientContext), src/consumer/mod.rs (ConsumerContext)
- Lines: src/client.rs:L50-141, src/consumer/mod.rs:L46-133
Signature
pub trait ClientContext: Send + Sync {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = false;
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) { .. }
fn stats(&self, statistics: Statistics) { .. }
fn stats_raw(&self, statistics: &[u8]) { .. }
fn error(&self, error: KafkaError, reason: &str) { .. }
fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn Error>> { .. }
}
pub trait ConsumerContext: ClientContext + Sized {
fn rebalance(
&self,
base_consumer: &BaseConsumer<Self>,
err: RDKafkaRespErr,
tpl: &mut TopicPartitionList,
) { .. }
fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {}
fn main_queue_min_poll_interval(&self) -> Timeout { .. }
}
Import
use rdkafka::client::ClientContext;
use rdkafka::consumer::ConsumerContext;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| User struct | impl ClientContext | Yes | A Send + Sync struct implementing the trait |
| level | RDKafkaLogLevel | No | Log level for log() callback |
| statistics | Statistics | No | Decoded stats for stats() callback |
| rebalance | Rebalance<'_> | No | Rebalance event info for rebalance callbacks |
Outputs
| Name | Type | Description |
|---|---|---|
| Context instance | impl ClientContext | Passed to create_with_context() to create a client |
| OAuthToken | Result<OAuthToken, Box<dyn Error>> | Generated token for OAUTHBEARER SASL |
Usage Examples
Custom Logging Context
use rdkafka::client::ClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{ConsumerContext, StreamConsumer};
use rdkafka::error::KafkaResult;
use rdkafka::topic_partition_list::TopicPartitionList;
struct LoggingContext;
impl ClientContext for LoggingContext {
fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
println!("[{:?}] {}: {}", level, fac, log_message);
}
}
impl ConsumerContext for LoggingContext {
fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {
println!("Committed offsets: {:?}, result: {:?}", offsets, result);
}
}
let consumer: StreamConsumer<LoggingContext> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my-group")
.create_with_context(LoggingContext)
.expect("Consumer creation failed");