Implementation:BerriAI Litellm Router Completion
| Knowledge Sources | Domains | Last Updated |
|---|---|---|
| litellm repository | LLM Load Balancing, Request Distribution | 2026-02-15 |
Overview
Concrete tool for routing LLM completion requests to optimal deployments provided by LiteLLM, implemented as the Router.completion and Router.acompletion methods.
Description
The Router provides both synchronous and asynchronous completion methods that transparently handle deployment selection, fallbacks, and retries:
Router.completion(model, messages, **kwargs)-- Synchronous entry point. Sets up the fallback wrapper by assigning_completionas the original function, then delegates tofunction_with_fallbackswhich handles retry and fallback execution.
Router._completion(model, messages, **kwargs)-- Internal method that performs the actual routing: callsget_available_deploymentto select from healthy endpoints, extracts the provider-specificlitellm_params, obtains or creates an HTTP client, runs pre-call strategy checks (e.g., RPM validation), and then callslitellm.completion(). It also supports silent experiments where traffic is mirrored to a secondary model in a background thread.
Router.acompletion(model, messages, stream, **kwargs)-- Asynchronous entry point withtyping.overloadsignatures for stream/non-stream return types. Supports priority-based scheduling via theSchedulerand prompt management models. Delegates toasync_function_with_fallbacksfor retry/fallback handling.
Usage
Use Router.completion or Router.acompletion as drop-in replacements for litellm.completion when you need load-balanced, fault-tolerant routing:
response = router.completion(model="gpt-4", messages=[{"role": "user", "content": "Hello"}])
# or
response = await router.acompletion(model="gpt-4", messages=[{"role": "user", "content": "Hello"}])
Code Reference
Source Location: litellm/router.py, lines 1233-1465
completion Signature:
def completion(
self, model: str, messages: List[Dict[str, str]], **kwargs
) -> Union[ModelResponse, CustomStreamWrapper]:
acompletion Signature:
async def acompletion(
self,
model: str,
messages: List[AllMessageValues],
stream: bool = False,
**kwargs,
) -> Union[ModelResponse, CustomStreamWrapper]:
Import:
from litellm import Router
router = Router(model_list=model_list)
# Methods are called on the router instance:
# router.completion(...)
# await router.acompletion(...)
I/O Contract
Inputs
| Input Parameter | Type | Required | Description |
|---|---|---|---|
| model | str |
Yes | Logical model name (e.g., "gpt-4") or specific deployment model ID
|
| messages | List[Dict[str, str]] |
Yes | Chat messages in OpenAI format, e.g., [{"role": "user", "content": "..."}]
|
| stream | bool |
No | Whether to stream the response (async only); defaults to False
|
| priority | Optional[int] |
No | Request priority for scheduling (async only); lower values = higher priority |
| specific_deployment | Optional[str] |
No | Bypass routing and target a specific deployment by ID |
| **kwargs | various | No | Additional parameters passed through to litellm.completion() (e.g., temperature, max_tokens, api_key)
|
Outputs
| Output | Type | Description |
|---|---|---|
| response | ModelResponse |
Standard completion response (non-streaming) |
| response | CustomStreamWrapper |
Streaming response iterator (when stream=True)
|
| (raises) | Exception |
Original or fallback exception if all retries and fallbacks are exhausted |
Usage Examples
Basic synchronous completion:
from litellm import Router
router = Router(model_list=[
{"model_name": "gpt-4", "litellm_params": {"model": "gpt-4", "api_key": "sk-xxx"}},
{"model_name": "gpt-4", "litellm_params": {"model": "azure/gpt-4-east", "api_key": "sk-azure"}},
])
response = router.completion(
model="gpt-4",
messages=[{"role": "user", "content": "What is the capital of France?"}],
temperature=0.7,
)
print(response.choices[0].message.content)
Async streaming completion:
import asyncio
from litellm import Router
router = Router(model_list=model_list)
async def main():
response = await router.acompletion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a poem."}],
stream=True,
)
async for chunk in response:
print(chunk.choices[0].delta.content or "", end="")
asyncio.run(main())
Completion with fallbacks:
router = Router(
model_list=model_list,
fallbacks=[{"gpt-4": ["gpt-3.5-turbo"]}],
num_retries=3,
)
# If gpt-4 deployments all fail after 3 retries, automatically falls back to gpt-3.5-turbo
response = router.completion(
model="gpt-4",
messages=[{"role": "user", "content": "Summarize this document."}],
)
Targeting a specific deployment by ID:
response = router.completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
specific_deployment="deployment-uuid-123",
)