Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:BerriAI Litellm Logging Worker

From Leeroopedia
Attribute Value
Sources litellm/litellm_core_utils/logging_worker.py
Domains Logging, Performance, Async Processing, Background Tasks
last_updated 2026-02-15 16:00 GMT

Overview

The Logging Worker implements an asynchronous background task processor that queues and executes logging coroutines without blocking the main request path, providing a +200 RPS performance improvement.

Description

LoggingWorker is a bounded-queue, semaphore-limited async worker designed for best-effort execution of non-critical coroutines such as success/error callbacks, observability logging, and spend tracking.

Architecture:

  • Bounded queue -- An asyncio.Queue with configurable max size (default: 50,000 from LOGGING_WORKER_MAX_QUEUE_SIZE) prevents unbounded memory growth.
  • Concurrency control -- An asyncio.Semaphore (default: 100 from LOGGING_WORKER_CONCURRENCY) limits the number of simultaneously executing logging tasks.
  • Context preservation -- Each enqueued task captures contextvars.copy_context() at enqueue time, ensuring context variables (e.g., request IDs) are available during execution.
  • Timeout protection -- Each task is wrapped in asyncio.wait_for() with a configurable timeout (default: 20s from LOGGING_WORKER_MAX_TIME_PER_COROUTINE).

Queue full handling: When the queue is full, the worker employs an aggressive clearing strategy:

  1. Extracts a configurable percentage (default 50%) of queue items.
  2. Processes them concurrently in a background task.
  3. Implements cooldown periods (default 0.5s) between aggressive clears.
  4. Tasks arriving during cooldown are scheduled for delayed retry.

Graceful shutdown:

  • stop() cancels all running tasks and the worker loop.
  • _flush_on_exit() is registered via atexit to process remaining queue items synchronously before process exit, creating a new event loop if the original is closed.
  • clear_queue() processes remaining items with time limits (MAX_TIME_TO_CLEAR_QUEUE).

Event loop resilience: The worker detects event loop changes (e.g., in test environments) and reinitializes its queue and semaphore when the running loop differs from the one it was bound to.

A global singleton GLOBAL_LOGGING_WORKER is provided for backward compatibility.

Usage

The worker is typically started automatically and tasks are enqueued via the global instance:

from litellm.litellm_core_utils.logging_worker import GLOBAL_LOGGING_WORKER

Code Reference

Source Location

/litellm/litellm_core_utils/logging_worker.py (523 lines)

Class: LoggingWorker

Method Signature Purpose
__init__ def __init__(self, timeout=20.0, max_queue_size=50000, concurrency=100) Configures queue bounds, concurrency, and timeout
start def start(self) -> None Starts the worker loop (idempotent)
enqueue def enqueue(self, coroutine: Coroutine) -> None Adds a coroutine to the logging queue (non-blocking)
ensure_initialized_and_enqueue def ensure_initialized_and_enqueue(self, async_coroutine: Coroutine) Start + enqueue in one call
stop async def stop(self) -> None Cancels all tasks and stops the worker
flush async def flush(self) -> None Waits for all queued tasks to complete
clear_queue async def clear_queue(self) -> None Processes and clears remaining queue items with time limit

Type: LoggingTask

class LoggingTask(TypedDict):
    coroutine: Coroutine
    context: contextvars.Context

Global Instance

GLOBAL_LOGGING_WORKER = LoggingWorker()

Import

from litellm.litellm_core_utils.logging_worker import LoggingWorker, GLOBAL_LOGGING_WORKER

I/O Contract

Inputs

Parameter Type Description
coroutine Coroutine An awaitable coroutine to be executed in the background
timeout float Max seconds per task (default: 20.0)
max_queue_size int Maximum queue depth (default: 50,000)
concurrency int Maximum concurrent tasks (default: 100)

Outputs

The worker does not return values from enqueued tasks. Errors in tasks are logged but do not propagate to callers.

Usage Examples

from litellm.litellm_core_utils.logging_worker import GLOBAL_LOGGING_WORKER

# Enqueue a logging callback (non-blocking)
async def log_to_langfuse(response_data):
    await langfuse_client.log(response_data)

GLOBAL_LOGGING_WORKER.ensure_initialized_and_enqueue(
    log_to_langfuse(response_data)
)
# Custom worker with different settings
from litellm.litellm_core_utils.logging_worker import LoggingWorker

worker = LoggingWorker(
    timeout=10.0,
    max_queue_size=10000,
    concurrency=50,
)
worker.start()
worker.enqueue(some_async_callback())

# Graceful shutdown
await worker.stop()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment