Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Fede1024 Rust rdkafka ClientContext Trait

From Leeroopedia


Knowledge Sources
Domains Messaging, Design_Patterns
Last Updated 2026-02-07 19:00 GMT

Overview

Concrete trait definitions for injecting custom callback logic into Kafka clients provided by rust-rdkafka.

Description

ClientContext is the base trait that all Kafka clients accept. It defines callbacks for logging, statistics, errors, and OAuth token generation. All methods have default implementations, so users only need to override what they want to customize.

ConsumerContext extends ClientContext with consumer-specific callbacks: partition rebalance handling (assign/revoke), commit callbacks, and main queue polling interval configuration. The default rebalance implementation handles both eager and cooperative rebalance protocols.

Usage

Implement these traits on your own struct and pass it to ClientConfig::create_with_context(). You only need to override the methods you want to customize.

Code Reference

Source Location

  • Repository: rust-rdkafka
  • File: src/client.rs (ClientContext), src/consumer/mod.rs (ConsumerContext)
  • Lines: src/client.rs:L50-141, src/consumer/mod.rs:L46-133

Signature

pub trait ClientContext: Send + Sync {
    const ENABLE_REFRESH_OAUTH_TOKEN: bool = false;

    fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) { .. }
    fn stats(&self, statistics: Statistics) { .. }
    fn stats_raw(&self, statistics: &[u8]) { .. }
    fn error(&self, error: KafkaError, reason: &str) { .. }
    fn generate_oauth_token(
        &self,
        _oauthbearer_config: Option<&str>,
    ) -> Result<OAuthToken, Box<dyn Error>> { .. }
}

pub trait ConsumerContext: ClientContext + Sized {
    fn rebalance(
        &self,
        base_consumer: &BaseConsumer<Self>,
        err: RDKafkaRespErr,
        tpl: &mut TopicPartitionList,
    ) { .. }
    fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
    fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
    fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {}
    fn main_queue_min_poll_interval(&self) -> Timeout { .. }
}

Import

use rdkafka::client::ClientContext;
use rdkafka::consumer::ConsumerContext;

I/O Contract

Inputs

Name Type Required Description
User struct impl ClientContext Yes A Send + Sync struct implementing the trait
level RDKafkaLogLevel No Log level for log() callback
statistics Statistics No Decoded stats for stats() callback
rebalance Rebalance<'_> No Rebalance event info for rebalance callbacks

Outputs

Name Type Description
Context instance impl ClientContext Passed to create_with_context() to create a client
OAuthToken Result<OAuthToken, Box<dyn Error>> Generated token for OAUTHBEARER SASL

Usage Examples

Custom Logging Context

use rdkafka::client::ClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{ConsumerContext, StreamConsumer};
use rdkafka::error::KafkaResult;
use rdkafka::topic_partition_list::TopicPartitionList;

struct LoggingContext;

impl ClientContext for LoggingContext {
    fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
        println!("[{:?}] {}: {}", level, fac, log_message);
    }
}

impl ConsumerContext for LoggingContext {
    fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {
        println!("Committed offsets: {:?}, result: {:?}", offsets, result);
    }
}

let consumer: StreamConsumer<LoggingContext> = ClientConfig::new()
    .set("bootstrap.servers", "localhost:9092")
    .set("group.id", "my-group")
    .create_with_context(LoggingContext)
    .expect("Consumer creation failed");

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment