Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:BerriAI Litellm Custom Logger

From Leeroopedia
Knowledge Sources Domains Last Updated
[[1]] Observability, Extensibility 2026-02-15

Overview

Concrete tool for developing custom observability integrations provided by the CustomLogger and CustomBatchLogger base classes in LiteLLM.

Description

The CustomLogger class is the base class for all observability integrations in LiteLLM. It defines a comprehensive set of lifecycle hook methods -- all with no-op default implementations -- that subclasses can override to receive notifications at each stage of an LLM API call. It also provides a static method get_callback_env_vars() for discovering required environment variables for a given callback name.

The CustomBatchLogger extends CustomLogger with batching infrastructure: an in-memory log_queue, configurable batch_size and flush_interval, a flush_lock for concurrency safety, and a periodic_flush() coroutine that runs in the background. Subclasses like LangsmithLogger override async_send_batch() to transmit queued events.

Usage

Use CustomLogger when:

  • Building a new observability integration that processes events one at a time.
  • Implementing guardrails or content filters via pre-call and post-call hooks.
  • Creating router-level hooks for deployment filtering or request modification.

Use CustomBatchLogger when:

  • Building an integration that benefits from batching (HTTP-based backends like Langsmith, Datadog).
  • You need automatic periodic flushing with configurable intervals.

Code Reference

Source Location

  • litellm/integrations/custom_logger.py (lines 1-1059)
  • litellm/integrations/custom_batch_logger.py (lines 1-58)

Signature

CustomLogger:

class CustomLogger:
    def __init__(
        self,
        turn_off_message_logging: bool = False,
        message_logging: bool = True,  # deprecated
        **kwargs,
    ) -> None: ...

    # Sync lifecycle hooks
    def log_pre_api_call(self, model, messages, kwargs): ...
    def log_post_api_call(self, kwargs, response_obj, start_time, end_time): ...
    def log_stream_event(self, kwargs, response_obj, start_time, end_time): ...
    def log_success_event(self, kwargs, response_obj, start_time, end_time): ...
    def log_failure_event(self, kwargs, response_obj, start_time, end_time): ...

    # Async lifecycle hooks
    async def async_log_pre_api_call(self, model, messages, kwargs): ...
    async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time): ...
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): ...
    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): ...

    # Request modification hook
    async def async_pre_request_hook(
        self, model: str, messages: List, kwargs: Dict
    ) -> Optional[Dict]: ...

    # Prompt management hooks
    async def async_get_chat_completion_prompt(
        self, model, messages, non_default_params, prompt_id, ...
    ) -> Tuple[str, List[AllMessageValues], dict]: ...

    # Router/proxy hooks
    async def async_pre_call_check(self, deployment, parent_otel_span) -> Optional[dict]: ...
    async def async_filter_deployments(self, model, healthy_deployments, messages, ...) -> List[dict]: ...
    async def async_pre_call_deployment_hook(self, kwargs, call_type) -> Optional[dict]: ...
    async def async_post_call_success_deployment_hook(self, request_data, response, call_type): ...

    # Utility
    @staticmethod
    def get_callback_env_vars(callback_name: Optional[str] = None) -> List[str]: ...

CustomBatchLogger:

class CustomBatchLogger(CustomLogger):
    def __init__(
        self,
        flush_lock: Optional[asyncio.Lock] = None,
        batch_size: Optional[int] = None,
        flush_interval: Optional[int] = None,
        **kwargs,
    ) -> None: ...

    async def periodic_flush(self) -> None: ...
    async def flush_queue(self) -> None: ...
    async def async_send_batch(self, *args, **kwargs) -> None: ...

Import

from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.custom_batch_logger import CustomBatchLogger

I/O Contract

Inputs (CustomLogger.__init__)

Parameter Type Required Description
turn_off_message_logging bool No (default: False) When True, message and response content will be redacted from the StandardLoggingPayload before reaching this logger.
message_logging bool No (default: True) Deprecated. Use turn_off_message_logging instead.

Inputs (CustomBatchLogger.__init__)

Parameter Type Required Description
flush_lock Optional[asyncio.Lock] No Lock for thread-safe queue flushing. If None, flush_queue() returns immediately.
batch_size Optional[int] No Maximum events to queue before auto-flushing. Defaults to litellm.DEFAULT_BATCH_SIZE.
flush_interval Optional[int] No Seconds between periodic flush cycles. Defaults to litellm.DEFAULT_FLUSH_INTERVAL_SECONDS.

Outputs (Lifecycle Hooks)

Hook Return Type Description
log_success_event / async_log_success_event None Side-effect only: export data to external system
log_failure_event / async_log_failure_event None Side-effect only: export error data to external system
async_pre_request_hook Optional[Dict] Modified kwargs to use for the request, or None for no changes
async_filter_deployments List[dict] Filtered list of healthy deployments

Usage Examples

Minimal Custom Logger

import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyLogger(CustomLogger):
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        payload = kwargs.get("standard_logging_object", {})
        print(f"[SUCCESS] Model={payload.get('model')} Cost=${payload.get('response_cost', 0):.6f}")

    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
        payload = kwargs.get("standard_logging_object", {})
        print(f"[FAILURE] Model={payload.get('model')} Error={payload.get('error_str')}")

# Register the custom logger
my_logger = MyLogger()
litellm.callbacks = [my_logger]

Custom Batch Logger

import asyncio
import litellm
from litellm.integrations.custom_batch_logger import CustomBatchLogger

class MyBatchLogger(CustomBatchLogger):
    def __init__(self):
        self.flush_lock = asyncio.Lock()
        super().__init__(
            flush_lock=self.flush_lock,
            batch_size=50,
            flush_interval=10,
        )

    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        payload = kwargs.get("standard_logging_object", {})
        self.log_queue.append(payload)
        if len(self.log_queue) >= self.batch_size:
            await self.flush_queue()

    async def async_send_batch(self, *args, **kwargs):
        # Send self.log_queue contents to your backend
        print(f"Sending batch of {len(self.log_queue)} events")

litellm.callbacks = [MyBatchLogger()]

Logger with Message Redaction

import litellm
from litellm.integrations.custom_logger import CustomLogger

class PrivacyAwareLogger(CustomLogger):
    def __init__(self):
        super().__init__(turn_off_message_logging=True)

    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        payload = kwargs.get("standard_logging_object", {})
        # payload["messages"] and payload["response"] will be redacted
        print(f"Cost: ${payload.get('response_cost', 0):.6f}")

litellm.callbacks = [PrivacyAwareLogger()]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment