Implementation:Arize ai Phoenix Client Rate Limiters
| Knowledge Sources | |
|---|---|
| Domains | AI_Observability, Client_SDK, Rate_Limiting |
| Last Updated | 2026-02-14 05:30 GMT |
Overview
Adaptive token bucket rate limiter with sync and async decorators that dynamically adjusts request rates based on observed rate limit errors, using exponential backoff and cooldown mechanisms.
Description
The Client Rate Limiters module implements a self-tuning rate limiter that does not require advance knowledge of the target API's rate limit. It starts at an initial rate and adapts based on runtime feedback.
AdaptiveTokenBucket is the core rate limiting engine. It maintains a token bucket whose refill rate adjusts dynamically:
- Increase: When no rate limit errors occur, the rate gradually increases using an exponential growth factor (
rate * exp(rate_increase_factor * elapsed_time)). If the enforcement window expires without errors, the rate resets to the initial value. - Decrease: When a rate limit error is observed, the rate is multiplied by
rate_reduction_factor(default 0.5), tokens are reset to zero, and the caller blocks for a configurable cooldown period. A cooldown mechanism prevents concurrent requests from triggering multiple reductions. - Bounds: The rate is clamped between
minimum_per_second_request_rate(default 0.1) andmaximum_per_second_request_rate(auto-computed as the initial rate multiplied by(1/rate_reduction_factor)^3unless overridden).
RateLimiter provides decorator-based integration:
limit(fn)-- A synchronous decorator that wraps a callable. Before each call it waits for a token viawait_until_ready(). On rate limit error, it reduces the rate and retries up tomax_rate_limit_retriestimes.alimit(fn)-- An async decorator with the same semantics. It uses anasyncio.Eventandasyncio.Lockto coordinate rate limit handling across concurrent async tasks: when one task hits a rate limit, all other tasks pause until the handler finishes.
RateLimitError is a custom exception raised when all rate limit retries are exhausted. It carries diagnostic information including current rate, initial rate, and enforcement window.
UnavailableTokensError is an internal exception raised by make_request_if_ready() when insufficient tokens are available, driving the wait loop.
Usage
Use this module to wrap API calls that may encounter rate limiting. The RateLimiter class is the primary entry point: instantiate it with the expected error type and apply limit() or alimit() to your callable. The adaptive bucket automatically discovers and respects the target API's rate limits without manual configuration.
Code Reference
Source Location
- Repository: Arize_ai_Phoenix
- File: packages/phoenix-client/src/phoenix/client/utils/rate_limiters.py
Signature
class AdaptiveTokenBucket:
def __init__(
self,
initial_per_second_request_rate: float,
maximum_per_second_request_rate: Optional[float] = None,
minimum_per_second_request_rate: float = 0.1,
enforcement_window_minutes: float = 1,
rate_reduction_factor: float = 0.5,
rate_increase_factor: float = 0.01,
cooldown_seconds: float = 5,
): ...
def increase_rate(self) -> None: ...
def on_rate_limit_error(self, request_start_time: float, verbose: bool = False) -> None: ...
def wait_until_ready(self, max_wait_time: float = 300) -> None: ...
async def async_wait_until_ready(self, max_wait_time: float = 10) -> None: ...
class RateLimitError(PhoenixException):
def __init__(
self,
message: str = "Exceeded rate limit retries",
*,
current_rate_tokens_per_sec: Optional[float] = None,
initial_rate_tokens_per_sec: Optional[float] = None,
enforcement_window_seconds: Optional[float] = None,
) -> None: ...
class UnavailableTokensError(PhoenixException): ...
class RateLimiter:
def __init__(
self,
rate_limit_error: Optional[Type[BaseException]] = None,
max_rate_limit_retries: int = 0,
initial_per_second_request_rate: float = 1.0,
maximum_per_second_request_rate: Optional[float] = None,
enforcement_window_minutes: float = 1,
rate_reduction_factor: float = 0.5,
rate_increase_factor: float = 0.01,
cooldown_seconds: float = 5,
verbose: bool = False,
) -> None: ...
def limit(self, fn: Callable[P, T]) -> Callable[P, T]: ...
def alimit(self, fn: AsyncCallable[P, T]) -> AsyncCallable[P, T]: ...
Import
from phoenix.client.utils.rate_limiters import (
AdaptiveTokenBucket,
RateLimiter,
RateLimitError,
UnavailableTokensError,
)
I/O Contract
AdaptiveTokenBucket
| Parameter | Type | Description |
|---|---|---|
| initial_per_second_request_rate | float |
Starting token refill rate in requests per second |
| maximum_per_second_request_rate | Optional[float] |
Upper bound on rate; auto-computed if not set |
| minimum_per_second_request_rate | float |
Floor rate (default 0.1 RPS) |
| enforcement_window_minutes | float |
Window over which token capacity is computed (default 1 minute) |
| rate_reduction_factor | float |
Multiplicative factor applied on error (default 0.5) |
| rate_increase_factor | float |
Exponential growth constant for rate recovery (default 0.01) |
| cooldown_seconds | float |
Minimum time between consecutive rate reductions (default 5s) |
RateLimiter
| Parameter | Type | Description |
|---|---|---|
| rate_limit_error | Optional[Type[BaseException]] |
The exception type that indicates a rate limit response |
| max_rate_limit_retries | int |
Number of times to retry after rate limit errors (default 0) |
| verbose | bool |
Print throttle messages to stdout (default False) |
| Return | Type | Description |
|---|---|---|
| Decorated function | Same as input | Wrapped function with rate limiting applied |
| On exhaustion | Raises RateLimitError |
Contains current rate, initial rate, and enforcement window |
Usage Examples
from phoenix.client.utils.rate_limiters import RateLimiter
# Synchronous usage
limiter = RateLimiter(
rate_limit_error=openai.RateLimitError,
max_rate_limit_retries=5,
initial_per_second_request_rate=10.0,
)
@limiter.limit
def call_openai(prompt: str) -> str:
return openai_client.chat.completions.create(
model="gpt-4", messages=[{"role": "user", "content": prompt}]
)
# Async usage
@limiter.alimit
async def async_call_openai(prompt: str) -> str:
return await openai_client.chat.completions.acreate(
model="gpt-4", messages=[{"role": "user", "content": prompt}]
)
# Direct token bucket usage
from phoenix.client.utils.rate_limiters import AdaptiveTokenBucket
import time
bucket = AdaptiveTokenBucket(initial_per_second_request_rate=5.0)
bucket.wait_until_ready()
start = time.time()
try:
result = make_api_call()
except SomeRateLimitError:
bucket.on_rate_limit_error(start)
Related Pages
- Principle:Arize_ai_Phoenix_Evaluation_Execution
- Arize_ai_Phoenix_Client_Executors -- Executors that integrate with this rate limiter