Implementation:BerriAI Litellm Logging Worker
| Attribute | Value |
|---|---|
| Sources | litellm/litellm_core_utils/logging_worker.py |
| Domains | Logging, Performance, Async Processing, Background Tasks |
| last_updated | 2026-02-15 16:00 GMT |
Overview
The Logging Worker implements an asynchronous background task processor that queues and executes logging coroutines without blocking the main request path, providing a +200 RPS performance improvement.
Description
LoggingWorker is a bounded-queue, semaphore-limited async worker designed for best-effort execution of non-critical coroutines such as success/error callbacks, observability logging, and spend tracking.
Architecture:
- Bounded queue -- An
asyncio.Queuewith configurable max size (default: 50,000 fromLOGGING_WORKER_MAX_QUEUE_SIZE) prevents unbounded memory growth. - Concurrency control -- An
asyncio.Semaphore(default: 100 fromLOGGING_WORKER_CONCURRENCY) limits the number of simultaneously executing logging tasks. - Context preservation -- Each enqueued task captures
contextvars.copy_context()at enqueue time, ensuring context variables (e.g., request IDs) are available during execution. - Timeout protection -- Each task is wrapped in
asyncio.wait_for()with a configurable timeout (default: 20s fromLOGGING_WORKER_MAX_TIME_PER_COROUTINE).
Queue full handling: When the queue is full, the worker employs an aggressive clearing strategy:
- Extracts a configurable percentage (default 50%) of queue items.
- Processes them concurrently in a background task.
- Implements cooldown periods (default 0.5s) between aggressive clears.
- Tasks arriving during cooldown are scheduled for delayed retry.
Graceful shutdown:
stop()cancels all running tasks and the worker loop._flush_on_exit()is registered viaatexitto process remaining queue items synchronously before process exit, creating a new event loop if the original is closed.clear_queue()processes remaining items with time limits (MAX_TIME_TO_CLEAR_QUEUE).
Event loop resilience: The worker detects event loop changes (e.g., in test environments) and reinitializes its queue and semaphore when the running loop differs from the one it was bound to.
A global singleton GLOBAL_LOGGING_WORKER is provided for backward compatibility.
Usage
The worker is typically started automatically and tasks are enqueued via the global instance:
from litellm.litellm_core_utils.logging_worker import GLOBAL_LOGGING_WORKER
Code Reference
Source Location
/litellm/litellm_core_utils/logging_worker.py (523 lines)
Class: LoggingWorker
| Method | Signature | Purpose |
|---|---|---|
__init__ |
def __init__(self, timeout=20.0, max_queue_size=50000, concurrency=100) |
Configures queue bounds, concurrency, and timeout |
start |
def start(self) -> None |
Starts the worker loop (idempotent) |
enqueue |
def enqueue(self, coroutine: Coroutine) -> None |
Adds a coroutine to the logging queue (non-blocking) |
ensure_initialized_and_enqueue |
def ensure_initialized_and_enqueue(self, async_coroutine: Coroutine) |
Start + enqueue in one call |
stop |
async def stop(self) -> None |
Cancels all tasks and stops the worker |
flush |
async def flush(self) -> None |
Waits for all queued tasks to complete |
clear_queue |
async def clear_queue(self) -> None |
Processes and clears remaining queue items with time limit |
Type: LoggingTask
class LoggingTask(TypedDict):
coroutine: Coroutine
context: contextvars.Context
Global Instance
GLOBAL_LOGGING_WORKER = LoggingWorker()
Import
from litellm.litellm_core_utils.logging_worker import LoggingWorker, GLOBAL_LOGGING_WORKER
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
coroutine |
Coroutine |
An awaitable coroutine to be executed in the background |
timeout |
float |
Max seconds per task (default: 20.0) |
max_queue_size |
int |
Maximum queue depth (default: 50,000) |
concurrency |
int |
Maximum concurrent tasks (default: 100) |
Outputs
The worker does not return values from enqueued tasks. Errors in tasks are logged but do not propagate to callers.
Usage Examples
from litellm.litellm_core_utils.logging_worker import GLOBAL_LOGGING_WORKER
# Enqueue a logging callback (non-blocking)
async def log_to_langfuse(response_data):
await langfuse_client.log(response_data)
GLOBAL_LOGGING_WORKER.ensure_initialized_and_enqueue(
log_to_langfuse(response_data)
)
# Custom worker with different settings
from litellm.litellm_core_utils.logging_worker import LoggingWorker
worker = LoggingWorker(
timeout=10.0,
max_queue_size=10000,
concurrency=50,
)
worker.start()
worker.enqueue(some_async_callback())
# Graceful shutdown
await worker.stop()
Related Pages
- BerriAI_Litellm_Constants - defines
LOGGING_WORKER_CONCURRENCY,LOGGING_WORKER_MAX_QUEUE_SIZE, and related constants - BerriAI_Litellm_Logging_Setup - provides
verbose_loggerused for error reporting - BerriAI_Litellm_Logging_Callback_Manager - manages the callback functions that are enqueued into this worker