Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Arize ai Phoenix Client Rate Limiters

From Leeroopedia
Revision as of 12:03, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Arize_ai_Phoenix_Client_Rate_Limiters.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains AI_Observability, Client_SDK, Rate_Limiting
Last Updated 2026-02-14 05:30 GMT

Overview

Adaptive token bucket rate limiter with sync and async decorators that dynamically adjusts request rates based on observed rate limit errors, using exponential backoff and cooldown mechanisms.

Description

The Client Rate Limiters module implements a self-tuning rate limiter that does not require advance knowledge of the target API's rate limit. It starts at an initial rate and adapts based on runtime feedback.

AdaptiveTokenBucket is the core rate limiting engine. It maintains a token bucket whose refill rate adjusts dynamically:

  • Increase: When no rate limit errors occur, the rate gradually increases using an exponential growth factor (rate * exp(rate_increase_factor * elapsed_time)). If the enforcement window expires without errors, the rate resets to the initial value.
  • Decrease: When a rate limit error is observed, the rate is multiplied by rate_reduction_factor (default 0.5), tokens are reset to zero, and the caller blocks for a configurable cooldown period. A cooldown mechanism prevents concurrent requests from triggering multiple reductions.
  • Bounds: The rate is clamped between minimum_per_second_request_rate (default 0.1) and maximum_per_second_request_rate (auto-computed as the initial rate multiplied by (1/rate_reduction_factor)^3 unless overridden).

RateLimiter provides decorator-based integration:

  • limit(fn) -- A synchronous decorator that wraps a callable. Before each call it waits for a token via wait_until_ready(). On rate limit error, it reduces the rate and retries up to max_rate_limit_retries times.
  • alimit(fn) -- An async decorator with the same semantics. It uses an asyncio.Event and asyncio.Lock to coordinate rate limit handling across concurrent async tasks: when one task hits a rate limit, all other tasks pause until the handler finishes.

RateLimitError is a custom exception raised when all rate limit retries are exhausted. It carries diagnostic information including current rate, initial rate, and enforcement window.

UnavailableTokensError is an internal exception raised by make_request_if_ready() when insufficient tokens are available, driving the wait loop.

Usage

Use this module to wrap API calls that may encounter rate limiting. The RateLimiter class is the primary entry point: instantiate it with the expected error type and apply limit() or alimit() to your callable. The adaptive bucket automatically discovers and respects the target API's rate limits without manual configuration.

Code Reference

Source Location

Signature

class AdaptiveTokenBucket:
    def __init__(
        self,
        initial_per_second_request_rate: float,
        maximum_per_second_request_rate: Optional[float] = None,
        minimum_per_second_request_rate: float = 0.1,
        enforcement_window_minutes: float = 1,
        rate_reduction_factor: float = 0.5,
        rate_increase_factor: float = 0.01,
        cooldown_seconds: float = 5,
    ): ...
    def increase_rate(self) -> None: ...
    def on_rate_limit_error(self, request_start_time: float, verbose: bool = False) -> None: ...
    def wait_until_ready(self, max_wait_time: float = 300) -> None: ...
    async def async_wait_until_ready(self, max_wait_time: float = 10) -> None: ...

class RateLimitError(PhoenixException):
    def __init__(
        self,
        message: str = "Exceeded rate limit retries",
        *,
        current_rate_tokens_per_sec: Optional[float] = None,
        initial_rate_tokens_per_sec: Optional[float] = None,
        enforcement_window_seconds: Optional[float] = None,
    ) -> None: ...

class UnavailableTokensError(PhoenixException): ...

class RateLimiter:
    def __init__(
        self,
        rate_limit_error: Optional[Type[BaseException]] = None,
        max_rate_limit_retries: int = 0,
        initial_per_second_request_rate: float = 1.0,
        maximum_per_second_request_rate: Optional[float] = None,
        enforcement_window_minutes: float = 1,
        rate_reduction_factor: float = 0.5,
        rate_increase_factor: float = 0.01,
        cooldown_seconds: float = 5,
        verbose: bool = False,
    ) -> None: ...
    def limit(self, fn: Callable[P, T]) -> Callable[P, T]: ...
    def alimit(self, fn: AsyncCallable[P, T]) -> AsyncCallable[P, T]: ...

Import

from phoenix.client.utils.rate_limiters import (
    AdaptiveTokenBucket,
    RateLimiter,
    RateLimitError,
    UnavailableTokensError,
)

I/O Contract

AdaptiveTokenBucket

Parameter Type Description
initial_per_second_request_rate float Starting token refill rate in requests per second
maximum_per_second_request_rate Optional[float] Upper bound on rate; auto-computed if not set
minimum_per_second_request_rate float Floor rate (default 0.1 RPS)
enforcement_window_minutes float Window over which token capacity is computed (default 1 minute)
rate_reduction_factor float Multiplicative factor applied on error (default 0.5)
rate_increase_factor float Exponential growth constant for rate recovery (default 0.01)
cooldown_seconds float Minimum time between consecutive rate reductions (default 5s)

RateLimiter

Parameter Type Description
rate_limit_error Optional[Type[BaseException]] The exception type that indicates a rate limit response
max_rate_limit_retries int Number of times to retry after rate limit errors (default 0)
verbose bool Print throttle messages to stdout (default False)
Return Type Description
Decorated function Same as input Wrapped function with rate limiting applied
On exhaustion Raises RateLimitError Contains current rate, initial rate, and enforcement window

Usage Examples

from phoenix.client.utils.rate_limiters import RateLimiter

# Synchronous usage
limiter = RateLimiter(
    rate_limit_error=openai.RateLimitError,
    max_rate_limit_retries=5,
    initial_per_second_request_rate=10.0,
)

@limiter.limit
def call_openai(prompt: str) -> str:
    return openai_client.chat.completions.create(
        model="gpt-4", messages=[{"role": "user", "content": prompt}]
    )

# Async usage
@limiter.alimit
async def async_call_openai(prompt: str) -> str:
    return await openai_client.chat.completions.acreate(
        model="gpt-4", messages=[{"role": "user", "content": prompt}]
    )

# Direct token bucket usage
from phoenix.client.utils.rate_limiters import AdaptiveTokenBucket
import time

bucket = AdaptiveTokenBucket(initial_per_second_request_rate=5.0)
bucket.wait_until_ready()
start = time.time()
try:
    result = make_api_call()
except SomeRateLimitError:
    bucket.on_rate_limit_error(start)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment