Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Arize ai Phoenix Evals Rate Limiters

From Leeroopedia

Overview

The Evals Rate Limiters module provides an adaptive token-bucket rate limiting system with both synchronous and asynchronous support for the Phoenix evaluator framework. It resides at phoenix.evals.rate_limiters and implements automatic error-driven throttling that dynamically adjusts request rates in response to rate limit errors from LLM providers. The module does not require knowledge of the provider's actual rate limit -- it starts with an initial rate and adapts based on observed errors.

Description

The module contains four classes:

AdaptiveTokenBucket

The core rate limiting algorithm implementing an adaptive token bucket with the following characteristics:

  • Initial rate estimation: Starts with a configurable initial_per_second_request_rate and adjusts dynamically.
  • Rate reduction on error: When a rate limit error occurs, the rate is multiplied by rate_reduction_factor (default 0.5), effectively halving the allowed request rate.
  • Exponential rate recovery: When no errors occur, the rate gradually increases using exponential growth: rate *= exp(rate_increase_factor * time_since_last_update).
  • Rate bounds: The rate is clamped between minimum_per_second_request_rate (default 0.1) and maximum_per_second_request_rate (default is 3 consecutive rate-reductions above the initial rate).
  • Enforcement window: Tokens accumulate over a configurable window (default 1 minute), with max_tokens = rate * window.
  • Cooldown protection: After a rate limit error, concurrent requests that started before the error do not trigger additional rate reductions (guarded by cooldown_seconds).
  • Blocking on error: On a rate limit error, the bucket blocks for cooldown_seconds (default 5) via time.sleep() to allow the provider's rate limit to reset.
  • Token-based gating: make_request_if_ready() deducts a token or raises UnavailableTokensError; wait_until_ready() and async_wait_until_ready() spin until a token is available.

RateLimitError

A custom Phoenix exception raised when rate limit retries are exhausted. Carries metadata about the current and initial rates and the enforcement window.

RateLimiter

A higher-level wrapper that combines the AdaptiveTokenBucket with retry logic:

  • limit(fn): Synchronous decorator. Waits for a token, calls the function. On a rate limit error, triggers on_rate_limit_error() and retries up to max_rate_limit_retries times. Raises RateLimitError when retries are exhausted.
  • alimit(fn): Asynchronous decorator. Uses asyncio.Event and asyncio.Lock to coordinate rate limit handling across concurrent coroutines. When a rate limit error occurs, the event is cleared to prevent new requests from starting, the error is handled under a lock, retries proceed, and the event is set again to resume other requests. Includes a 120-second timeout failsafe on the event wait.
  • Lazy async initialization: The _initialize_async_primitives() method ensures asyncio primitives are created in the correct event loop, supporting re-entry across different loops.

UnavailableTokensError

A lightweight exception (subclass of PhoenixException) used internally by AdaptiveTokenBucket.make_request_if_ready() to signal that no tokens are available.

Usage

from phoenix.evals.rate_limiters import RateLimiter

# Create a rate limiter for an LLM provider
from openai import RateLimitError as OpenAIRateLimitError

rate_limiter = RateLimiter(
    rate_limit_error=OpenAIRateLimitError,
    max_rate_limit_retries=5,
    initial_per_second_request_rate=10.0,
    verbose=True,
)

# Wrap a sync function
@rate_limiter.limit
def call_openai(prompt: str) -> str:
    return openai_client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": prompt}]
    ).choices[0].message.content
# Wrap an async function
@rate_limiter.alimit
async def async_call_openai(prompt: str) -> str:
    response = await openai_client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

Code Reference

Symbol Kind Location Lines
UnavailableTokensError Exception packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 17-18
AdaptiveTokenBucket Class packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 33-161
AdaptiveTokenBucket.on_rate_limit_error() Method packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 97-118
AdaptiveTokenBucket.wait_until_ready() Method packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 135-148
AdaptiveTokenBucket.async_wait_until_ready() Method packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 149-161
RateLimitError Exception packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 164-177
RateLimiter Class packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 179-297
RateLimiter.limit() Method packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 209-238
RateLimiter.alimit() Method packages/phoenix-evals/src/phoenix/evals/rate_limiters.py 252-297

I/O Contract

RateLimiter.limit()

Direction Type Description
Input Callable[P, T] A synchronous function to wrap with rate limiting
Output Callable[P, T] A wrapped function that throttles calls and retries on rate limit errors
Raises RateLimitError When max_rate_limit_retries are exhausted

RateLimiter.alimit()

Direction Type Description
Input AsyncCallable[P, T] An asynchronous coroutine function to wrap
Output AsyncCallable[P, T] A wrapped async function that throttles calls and retries on rate limit errors
Raises RateLimitError When max_rate_limit_retries are exhausted

AdaptiveTokenBucket Constructor Parameters

Parameter Type Default Description
initial_per_second_request_rate float (required) Starting request rate in requests per second
maximum_per_second_request_rate Optional[float] None Maximum rate ceiling. Defaults to initial * (1/reduction_factor)^3
minimum_per_second_request_rate float 0.1 Minimum rate floor
enforcement_window_minutes float 1 Token accumulation window in minutes
rate_reduction_factor float 0.5 Multiplier applied on rate limit errors
rate_increase_factor float 0.01 Exponential growth factor for recovery
cooldown_seconds float 5 Minimum time between rate reductions; also blocking duration after error

Usage Examples

Rate Adaptation Behavior

from phoenix.evals.rate_limiters import AdaptiveTokenBucket

# Start at 10 requests/second
bucket = AdaptiveTokenBucket(
    initial_per_second_request_rate=10.0,
    rate_reduction_factor=0.5,
)

# After a rate limit error:
# rate: 10.0 -> 5.0 (halved)
bucket.on_rate_limit_error(request_start_time=time.time())

# After another error:
# rate: 5.0 -> 2.5 (halved again)
bucket.on_rate_limit_error(request_start_time=time.time())

# Over time without errors, rate recovers exponentially toward maximum
bucket.increase_rate()

Async Coordination

# The alimit decorator coordinates across concurrent coroutines:
# 1. All coroutines wait for the rate_limit_handling Event
# 2. On error, the event is cleared (blocking new requests)
# 3. The lock holder retries under the lock
# 4. Once resolved, the event is set (unblocking waiting coroutines)

rate_limiter = RateLimiter(
    rate_limit_error=SomeRateLimitError,
    max_rate_limit_retries=3,
)

@rate_limiter.alimit
async def my_api_call(data):
    return await api.call(data)

# Safe to call concurrently - rate limiting is coordinated
results = await asyncio.gather(*[my_api_call(d) for d in data_items])

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment