Implementation:BerriAI Litellm Lowest TPM RPM Strategy
| Attribute | Value |
|---|---|
| Sources | litellm/router_strategy/lowest_tpm_rpm.py |
| Domains | Router, Strategy, Rate Limiting |
| last_updated | 2026-02-15 16:00 GMT |
Overview
The Lowest TPM/RPM Strategy (V1) is the original router deployment selection strategy that routes requests to the deployment with the lowest tokens-per-minute (TPM) usage while respecting RPM limits.
Description
This module provides the LowestTPMLoggingHandler class, which extends CustomLogger to track per-deployment TPM and RPM usage within a model group. Unlike the V2 variant, this implementation stores aggregated dictionaries of deployment usage keyed by model group and the current minute (e.g., {model_group}:tpm:{HH-MM}). On each successful call, it updates both TPM and RPM counters in the cache. During deployment selection, it estimates input tokens, filters deployments exceeding their TPM/RPM limits, and returns the one with the lowest current TPM. This is a simpler, single-instance-oriented design compared to V2.
Usage
Import this class when configuring the LiteLLM Router with the original usage-based routing. It is generally superseded by the V2 strategy for multi-instance deployments with Redis.
Code Reference
Source Location
litellm/router_strategy/lowest_tpm_rpm.py
Classes
class RoutingArgs(LiteLLMPydanticObjectBase):
ttl: int = 1 * 60 # 1min (RPM/TPM expire key)
class LowestTPMLoggingHandler(CustomLogger):
test_flag: bool = False
logged_success: int = 0
logged_failure: int = 0
default_cache_time_seconds: int = 1 * 60 * 60 # 1 hour
def __init__(self, router_cache: DualCache, routing_args: dict = {}):
Key Methods
| Method | Signature | Description |
|---|---|---|
log_success_event |
def log_success_event(self, kwargs, response_obj, start_time, end_time) |
Sync callback that updates TPM and RPM counters in cache |
async_log_success_event |
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time) |
Async callback that updates TPM and RPM counters in cache |
get_available_deployments |
def get_available_deployments(self, model_group: str, healthy_deployments: list, messages: Optional[List[Dict[str, str]]] = None, input: Optional[Union[str, List]] = None) |
Returns the deployment with the lowest TPM usage within limits |
Import
from litellm.router_strategy.lowest_tpm_rpm import LowestTPMLoggingHandler
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
router_cache |
DualCache |
Shared cache instance for TPM/RPM counters |
routing_args |
dict |
Configuration with optional ttl (default 60s)
|
model_group |
str |
The model group to select a deployment for |
healthy_deployments |
list |
List of healthy deployment dictionaries |
messages |
Optional[List[Dict[str, str]]] |
Messages for estimating input tokens |
input |
Optional[Union[str, List]] |
Text input for estimating input tokens |
Outputs
| Return Type | Description |
|---|---|
Optional[dict] |
The deployment dictionary with the lowest TPM, or None if no deployments are within limits
|
Usage Examples
from litellm.caching.caching import DualCache
from litellm.router_strategy.lowest_tpm_rpm import LowestTPMLoggingHandler
cache = DualCache()
handler = LowestTPMLoggingHandler(router_cache=cache, routing_args={"ttl": 60})
deployment = handler.get_available_deployments(
model_group="gpt-4",
healthy_deployments=[
{"model_info": {"id": "deploy-1"}, "litellm_params": {"model": "gpt-4"}, "tpm": 100000, "rpm": 500},
{"model_info": {"id": "deploy-2"}, "litellm_params": {"model": "gpt-4"}, "tpm": 200000, "rpm": 1000},
],
messages=[{"role": "user", "content": "Hello, world!"}],
)
Related Pages
- BerriAI_Litellm_Lowest_TPM_RPM_V2_Strategy - Updated V2 strategy with cross-instance Redis support
- BerriAI_Litellm_Lowest_Cost_Strategy - Cost-based deployment selection
- BerriAI_Litellm_Least_Busy_Strategy - Least busy deployment selection
- BerriAI_Litellm_Simple_Shuffle_Strategy - Random/weighted shuffle deployment selection