Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Arize ai Phoenix Client Executors

From Leeroopedia
Knowledge Sources
Domains AI_Observability, Client_SDK, Concurrency
Last Updated 2026-02-14 05:30 GMT

Overview

Async and sync task executors with dynamic concurrency control, retry logic, and AIMD-based rate limiting for running batch operations within the Phoenix client SDK.

Description

The Client Executors module provides a framework for executing tasks in batch with automatic retry, progress tracking, and adaptive concurrency control. It contains two primary executor implementations:

  • AsyncExecutor -- An asynchronous executor that uses a producer-consumer pattern built on asyncio.PriorityQueue. Workers consume items from the queue, execute a coroutine function, and requeue failed tasks with decremented priority (increasing retry count). It supports configurable concurrency, task timeouts, graceful termination via signal handling, and an optional ConcurrencyController for dynamic scaling.
  • SyncExecutor -- A synchronous executor that iterates over inputs sequentially, retrying on failure up to a configurable maximum. It supports signal-based termination and progress tracking via tqdm.

The module also provides:

  • ExecutionStatus -- An enum tracking task outcome: DID_NOT_RUN, COMPLETED, COMPLETED_WITH_RETRIES, or FAILED.
  • ExecutionDetails -- A per-task record that accumulates exceptions, execution time, and final status.
  • ConcurrencyController -- An AIMD (Additive Increase / Multiplicative Decrease) controller that adjusts the target concurrency dynamically. On success windows it increases concurrency by a configurable step; on error windows it multiplicatively decreases. A collapse mechanism detects rapid consecutive errors and drops concurrency to 1 immediately. The controller also tracks exponentially smoothed latency.
  • get_executor_on_sync_context() -- A factory function that selects the appropriate executor based on runtime context. It checks whether an event loop is running, whether nest_asyncio has been patched, and whether the caller is on the main thread. It falls back to SyncExecutor when async execution is not feasible.

Rate limit errors from both phoenix.client and phoenix.evals receive special handling: they trigger requeue with error recording in the concurrency controller, rather than consuming a standard retry attempt.

Usage

Use these executors when running batch operations such as experiment tasks or evaluations. The AsyncExecutor is preferred for high-throughput workloads with concurrent API calls. The SyncExecutor is suitable for environments where async is unavailable (e.g., non-main threads without nest_asyncio). The get_executor_on_sync_context() function automatically selects the best executor for the current runtime environment.

Code Reference

Source Location

Signature

class ExecutionStatus(Enum):
    DID_NOT_RUN = "DID NOT RUN"
    COMPLETED = "COMPLETED"
    COMPLETED_WITH_RETRIES = "COMPLETED WITH RETRIES"
    FAILED = "FAILED"

class ExecutionDetails:
    def __init__(self) -> None: ...
    def fail(self) -> None: ...
    def complete(self) -> None: ...
    def log_exception(self, exc: Exception) -> None: ...
    def log_runtime(self, start_time: float) -> None: ...

class ConcurrencyController:
    def __init__(
        self,
        *,
        max_concurrency: int,
        initial_target: float,
        window_seconds: float = 5,
        increase_step: float = 0.5,
        decrease_ratio: float = 0.5,
        inactive_check_interval: float = 1.0,
        smoothing_factor: float = 0.2,
        collapse_window_seconds: float = 15.0,
        collapse_error_threshold: int = 2,
    ) -> None: ...
    def record_success(self, latency_seconds: float) -> None: ...
    def record_timeout(self) -> None: ...
    def record_error(self) -> None: ...

class AsyncExecutor:
    def __init__(
        self,
        generation_fn: Callable[[Any], Coroutine[Any, Any, Any]],
        concurrency: int = 3,
        max_retries: int = 10,
        exit_on_error: bool = True,
        timeout: Optional[int] = None,
        *,
        enable_dynamic_concurrency: bool = True,
    ): ...
    async def execute(self, inputs: Sequence[Any]) -> Tuple[List[Any], List[ExecutionDetails]]: ...
    def run(self, inputs: Sequence[Any]) -> Tuple[List[Any], List[ExecutionDetails]]: ...

class SyncExecutor:
    def __init__(
        self,
        generation_fn: Callable[[Any], Any],
        max_retries: int = 10,
        exit_on_error: bool = True,
    ): ...
    def run(self, inputs: Sequence[Any]) -> Tuple[List[Any], List[Any]]: ...

def get_executor_on_sync_context(
    sync_fn: Callable[[Any], Any],
    async_fn: Callable[[Any], Coroutine[Any, Any, Any]],
    run_sync: bool = False,
    concurrency: int = 3,
    max_retries: int = 10,
    exit_on_error: bool = True,
    fallback_return_value: Union[Unset, Any] = _unset,
    timeout: Optional[int] = None,
) -> Executor: ...

Import

from phoenix.client.utils.executors import (
    AsyncExecutor,
    SyncExecutor,
    ExecutionStatus,
    ExecutionDetails,
    ConcurrencyController,
    get_executor_on_sync_context,
)

I/O Contract

AsyncExecutor / SyncExecutor

Parameter Type Description
generation_fn Callable[[Any], Coroutine[Any, Any, Any]] (async) or Callable[[Any], Any] (sync) Function applied to each input item
concurrency int Maximum number of concurrent workers (async only, default 3)
max_retries int Maximum retry attempts per item (default 10)
exit_on_error bool Whether to abort all remaining tasks on first failure (default True)
fallback_return_value Any Default output for items that fail
timeout Optional[int] Per-task timeout in seconds (async only, default 60)
Return Type Description
outputs List[Any] Results from generation_fn, indexed by input position
execution_details List[ExecutionDetails] Per-item status, exceptions, and timing

ConcurrencyController

Parameter Type Description
max_concurrency int Upper bound on concurrency
initial_target float Starting concurrency target
window_seconds float Length of the AIMD feedback window (default 5s)
increase_step float Additive increase on success windows (default 0.5)
decrease_ratio float Multiplicative decrease on error windows (default 0.5)
collapse_error_threshold int Number of errors in collapse_window_seconds that triggers collapse to 1 (default 2)

Usage Examples

import asyncio
from phoenix.client.utils.executors import AsyncExecutor, get_executor_on_sync_context

# Direct usage of AsyncExecutor
async def call_llm(input_data):
    # ... call an LLM API and return result
    return {"output": "response"}

executor = AsyncExecutor(
    generation_fn=call_llm,
    concurrency=5,
    max_retries=3,
    exit_on_error=False,
    timeout=120,
)
outputs, details = asyncio.run(executor.execute(my_inputs))

# Using the factory to auto-select the best executor
executor = get_executor_on_sync_context(
    sync_fn=call_llm_sync,
    async_fn=call_llm,
    concurrency=5,
    max_retries=3,
)
outputs, details = executor.run(my_inputs)

# Inspect execution details
for i, detail in enumerate(details):
    print(f"Item {i}: {detail.status.value}, took {detail.execution_seconds:.2f}s")
    if detail.exceptions:
        print(f"  Encountered {len(detail.exceptions)} exceptions before final status")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment