Implementation:Arize ai Phoenix Evals Rate Limiters
Overview
The Evals Rate Limiters module provides an adaptive token-bucket rate limiting system with both synchronous and asynchronous support for the Phoenix evaluator framework. It resides at phoenix.evals.rate_limiters and implements automatic error-driven throttling that dynamically adjusts request rates in response to rate limit errors from LLM providers. The module does not require knowledge of the provider's actual rate limit -- it starts with an initial rate and adapts based on observed errors.
Description
The module contains four classes:
AdaptiveTokenBucket
The core rate limiting algorithm implementing an adaptive token bucket with the following characteristics:
- Initial rate estimation: Starts with a configurable
initial_per_second_request_rateand adjusts dynamically. - Rate reduction on error: When a rate limit error occurs, the rate is multiplied by
rate_reduction_factor(default 0.5), effectively halving the allowed request rate. - Exponential rate recovery: When no errors occur, the rate gradually increases using exponential growth:
rate *= exp(rate_increase_factor * time_since_last_update). - Rate bounds: The rate is clamped between
minimum_per_second_request_rate(default 0.1) andmaximum_per_second_request_rate(default is 3 consecutive rate-reductions above the initial rate). - Enforcement window: Tokens accumulate over a configurable window (default 1 minute), with
max_tokens = rate * window. - Cooldown protection: After a rate limit error, concurrent requests that started before the error do not trigger additional rate reductions (guarded by
cooldown_seconds). - Blocking on error: On a rate limit error, the bucket blocks for
cooldown_seconds(default 5) viatime.sleep()to allow the provider's rate limit to reset. - Token-based gating:
make_request_if_ready()deducts a token or raisesUnavailableTokensError;wait_until_ready()andasync_wait_until_ready()spin until a token is available.
RateLimitError
A custom Phoenix exception raised when rate limit retries are exhausted. Carries metadata about the current and initial rates and the enforcement window.
RateLimiter
A higher-level wrapper that combines the AdaptiveTokenBucket with retry logic:
- limit(fn): Synchronous decorator. Waits for a token, calls the function. On a rate limit error, triggers
on_rate_limit_error()and retries up tomax_rate_limit_retriestimes. RaisesRateLimitErrorwhen retries are exhausted. - alimit(fn): Asynchronous decorator. Uses
asyncio.Eventandasyncio.Lockto coordinate rate limit handling across concurrent coroutines. When a rate limit error occurs, the event is cleared to prevent new requests from starting, the error is handled under a lock, retries proceed, and the event is set again to resume other requests. Includes a 120-second timeout failsafe on the event wait. - Lazy async initialization: The
_initialize_async_primitives()method ensures asyncio primitives are created in the correct event loop, supporting re-entry across different loops.
A lightweight exception (subclass of PhoenixException) used internally by AdaptiveTokenBucket.make_request_if_ready() to signal that no tokens are available.
Usage
from phoenix.evals.rate_limiters import RateLimiter
# Create a rate limiter for an LLM provider
from openai import RateLimitError as OpenAIRateLimitError
rate_limiter = RateLimiter(
rate_limit_error=OpenAIRateLimitError,
max_rate_limit_retries=5,
initial_per_second_request_rate=10.0,
verbose=True,
)
# Wrap a sync function
@rate_limiter.limit
def call_openai(prompt: str) -> str:
return openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": prompt}]
).choices[0].message.content
# Wrap an async function
@rate_limiter.alimit
async def async_call_openai(prompt: str) -> str:
response = await openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Code Reference
| Symbol | Kind | Location | Lines |
|---|---|---|---|
UnavailableTokensError |
Exception | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 17-18 |
AdaptiveTokenBucket |
Class | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 33-161 |
AdaptiveTokenBucket.on_rate_limit_error() |
Method | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 97-118 |
AdaptiveTokenBucket.wait_until_ready() |
Method | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 135-148 |
AdaptiveTokenBucket.async_wait_until_ready() |
Method | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 149-161 |
RateLimitError |
Exception | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 164-177 |
RateLimiter |
Class | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 179-297 |
RateLimiter.limit() |
Method | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 209-238 |
RateLimiter.alimit() |
Method | packages/phoenix-evals/src/phoenix/evals/rate_limiters.py | 252-297 |
I/O Contract
RateLimiter.limit()
| Direction | Type | Description |
|---|---|---|
| Input | Callable[P, T] |
A synchronous function to wrap with rate limiting |
| Output | Callable[P, T] |
A wrapped function that throttles calls and retries on rate limit errors |
| Raises | RateLimitError |
When max_rate_limit_retries are exhausted
|
RateLimiter.alimit()
| Direction | Type | Description |
|---|---|---|
| Input | AsyncCallable[P, T] |
An asynchronous coroutine function to wrap |
| Output | AsyncCallable[P, T] |
A wrapped async function that throttles calls and retries on rate limit errors |
| Raises | RateLimitError |
When max_rate_limit_retries are exhausted
|
AdaptiveTokenBucket Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
initial_per_second_request_rate |
float |
(required) | Starting request rate in requests per second |
maximum_per_second_request_rate |
Optional[float] |
None |
Maximum rate ceiling. Defaults to initial * (1/reduction_factor)^3
|
minimum_per_second_request_rate |
float |
0.1 |
Minimum rate floor |
enforcement_window_minutes |
float |
1 |
Token accumulation window in minutes |
rate_reduction_factor |
float |
0.5 |
Multiplier applied on rate limit errors |
rate_increase_factor |
float |
0.01 |
Exponential growth factor for recovery |
cooldown_seconds |
float |
5 |
Minimum time between rate reductions; also blocking duration after error |
Usage Examples
Rate Adaptation Behavior
from phoenix.evals.rate_limiters import AdaptiveTokenBucket
# Start at 10 requests/second
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=10.0,
rate_reduction_factor=0.5,
)
# After a rate limit error:
# rate: 10.0 -> 5.0 (halved)
bucket.on_rate_limit_error(request_start_time=time.time())
# After another error:
# rate: 5.0 -> 2.5 (halved again)
bucket.on_rate_limit_error(request_start_time=time.time())
# Over time without errors, rate recovers exponentially toward maximum
bucket.increase_rate()
Async Coordination
# The alimit decorator coordinates across concurrent coroutines:
# 1. All coroutines wait for the rate_limit_handling Event
# 2. On error, the event is cleared (blocking new requests)
# 3. The lock holder retries under the lock
# 4. Once resolved, the event is set (unblocking waiting coroutines)
rate_limiter = RateLimiter(
rate_limit_error=SomeRateLimitError,
max_rate_limit_retries=3,
)
@rate_limiter.alimit
async def my_api_call(data):
return await api.call(data)
# Safe to call concurrently - rate limiting is coordinated
results = await asyncio.gather(*[my_api_call(d) for d in data_items])
Related Pages
- Arize_ai_Phoenix_Evals_Executors - The execution framework that wraps generation functions with rate limiters
- Arize_ai_Phoenix_Anthropic_Adapter - Provides
get_anthropic_rate_limit_errors()for Anthropic-specific errors - Arize_ai_Phoenix_OpenAI_Adapter - Provides
get_openai_rate_limit_errors()for OpenAI-specific errors - Arize_ai_Phoenix_Google_Adapter - Provides
GoogleGenAIRateLimitErrorfor Google-specific errors - Arize_ai_Phoenix_LiteLLM_Adapter - Provides
get_litellm_rate_limit_errors()for LiteLLM-specific errors - Arize_ai_Phoenix_LangChain_Adapter - Provides provider-specific rate limit errors via LangChain wrappers
- Arize_ai_Phoenix_LLM_Base_Types -
ProviderRegistration.get_rate_limit_errorsfield feeds this subsystem