Implementation:Arize ai Phoenix Client Executors
| Knowledge Sources | |
|---|---|
| Domains | AI_Observability, Client_SDK, Concurrency |
| Last Updated | 2026-02-14 05:30 GMT |
Overview
Async and sync task executors with dynamic concurrency control, retry logic, and AIMD-based rate limiting for running batch operations within the Phoenix client SDK.
Description
The Client Executors module provides a framework for executing tasks in batch with automatic retry, progress tracking, and adaptive concurrency control. It contains two primary executor implementations:
- AsyncExecutor -- An asynchronous executor that uses a producer-consumer pattern built on
asyncio.PriorityQueue. Workers consume items from the queue, execute a coroutine function, and requeue failed tasks with decremented priority (increasing retry count). It supports configurable concurrency, task timeouts, graceful termination via signal handling, and an optional ConcurrencyController for dynamic scaling.
- SyncExecutor -- A synchronous executor that iterates over inputs sequentially, retrying on failure up to a configurable maximum. It supports signal-based termination and progress tracking via
tqdm.
The module also provides:
- ExecutionStatus -- An enum tracking task outcome:
DID_NOT_RUN,COMPLETED,COMPLETED_WITH_RETRIES, orFAILED.
- ExecutionDetails -- A per-task record that accumulates exceptions, execution time, and final status.
- ConcurrencyController -- An AIMD (Additive Increase / Multiplicative Decrease) controller that adjusts the target concurrency dynamically. On success windows it increases concurrency by a configurable step; on error windows it multiplicatively decreases. A collapse mechanism detects rapid consecutive errors and drops concurrency to 1 immediately. The controller also tracks exponentially smoothed latency.
- get_executor_on_sync_context() -- A factory function that selects the appropriate executor based on runtime context. It checks whether an event loop is running, whether
nest_asynciohas been patched, and whether the caller is on the main thread. It falls back toSyncExecutorwhen async execution is not feasible.
Rate limit errors from both phoenix.client and phoenix.evals receive special handling: they trigger requeue with error recording in the concurrency controller, rather than consuming a standard retry attempt.
Usage
Use these executors when running batch operations such as experiment tasks or evaluations. The AsyncExecutor is preferred for high-throughput workloads with concurrent API calls. The SyncExecutor is suitable for environments where async is unavailable (e.g., non-main threads without nest_asyncio). The get_executor_on_sync_context() function automatically selects the best executor for the current runtime environment.
Code Reference
Source Location
Signature
class ExecutionStatus(Enum):
DID_NOT_RUN = "DID NOT RUN"
COMPLETED = "COMPLETED"
COMPLETED_WITH_RETRIES = "COMPLETED WITH RETRIES"
FAILED = "FAILED"
class ExecutionDetails:
def __init__(self) -> None: ...
def fail(self) -> None: ...
def complete(self) -> None: ...
def log_exception(self, exc: Exception) -> None: ...
def log_runtime(self, start_time: float) -> None: ...
class ConcurrencyController:
def __init__(
self,
*,
max_concurrency: int,
initial_target: float,
window_seconds: float = 5,
increase_step: float = 0.5,
decrease_ratio: float = 0.5,
inactive_check_interval: float = 1.0,
smoothing_factor: float = 0.2,
collapse_window_seconds: float = 15.0,
collapse_error_threshold: int = 2,
) -> None: ...
def record_success(self, latency_seconds: float) -> None: ...
def record_timeout(self) -> None: ...
def record_error(self) -> None: ...
class AsyncExecutor:
def __init__(
self,
generation_fn: Callable[[Any], Coroutine[Any, Any, Any]],
concurrency: int = 3,
max_retries: int = 10,
exit_on_error: bool = True,
timeout: Optional[int] = None,
*,
enable_dynamic_concurrency: bool = True,
): ...
async def execute(self, inputs: Sequence[Any]) -> Tuple[List[Any], List[ExecutionDetails]]: ...
def run(self, inputs: Sequence[Any]) -> Tuple[List[Any], List[ExecutionDetails]]: ...
class SyncExecutor:
def __init__(
self,
generation_fn: Callable[[Any], Any],
max_retries: int = 10,
exit_on_error: bool = True,
): ...
def run(self, inputs: Sequence[Any]) -> Tuple[List[Any], List[Any]]: ...
def get_executor_on_sync_context(
sync_fn: Callable[[Any], Any],
async_fn: Callable[[Any], Coroutine[Any, Any, Any]],
run_sync: bool = False,
concurrency: int = 3,
max_retries: int = 10,
exit_on_error: bool = True,
fallback_return_value: Union[Unset, Any] = _unset,
timeout: Optional[int] = None,
) -> Executor: ...
Import
from phoenix.client.utils.executors import (
AsyncExecutor,
SyncExecutor,
ExecutionStatus,
ExecutionDetails,
ConcurrencyController,
get_executor_on_sync_context,
)
I/O Contract
AsyncExecutor / SyncExecutor
| Parameter | Type | Description |
|---|---|---|
| generation_fn | Callable[[Any], Coroutine[Any, Any, Any]] (async) or Callable[[Any], Any] (sync) |
Function applied to each input item |
| concurrency | int |
Maximum number of concurrent workers (async only, default 3) |
| max_retries | int |
Maximum retry attempts per item (default 10) |
| exit_on_error | bool |
Whether to abort all remaining tasks on first failure (default True) |
| fallback_return_value | Any |
Default output for items that fail |
| timeout | Optional[int] |
Per-task timeout in seconds (async only, default 60) |
| Return | Type | Description |
|---|---|---|
| outputs | List[Any] |
Results from generation_fn, indexed by input position |
| execution_details | List[ExecutionDetails] |
Per-item status, exceptions, and timing |
ConcurrencyController
| Parameter | Type | Description |
|---|---|---|
| max_concurrency | int |
Upper bound on concurrency |
| initial_target | float |
Starting concurrency target |
| window_seconds | float |
Length of the AIMD feedback window (default 5s) |
| increase_step | float |
Additive increase on success windows (default 0.5) |
| decrease_ratio | float |
Multiplicative decrease on error windows (default 0.5) |
| collapse_error_threshold | int |
Number of errors in collapse_window_seconds that triggers collapse to 1 (default 2) |
Usage Examples
import asyncio
from phoenix.client.utils.executors import AsyncExecutor, get_executor_on_sync_context
# Direct usage of AsyncExecutor
async def call_llm(input_data):
# ... call an LLM API and return result
return {"output": "response"}
executor = AsyncExecutor(
generation_fn=call_llm,
concurrency=5,
max_retries=3,
exit_on_error=False,
timeout=120,
)
outputs, details = asyncio.run(executor.execute(my_inputs))
# Using the factory to auto-select the best executor
executor = get_executor_on_sync_context(
sync_fn=call_llm_sync,
async_fn=call_llm,
concurrency=5,
max_retries=3,
)
outputs, details = executor.run(my_inputs)
# Inspect execution details
for i, detail in enumerate(details):
print(f"Item {i}: {detail.status.value}, took {detail.execution_seconds:.2f}s")
if detail.exceptions:
print(f" Encountered {len(detail.exceptions)} exceptions before final status")
Related Pages
- Principle:Arize_ai_Phoenix_Evaluation_Execution
- Arize_ai_Phoenix_Client_Rate_Limiters -- Adaptive token bucket used alongside executors
- Arize_ai_Phoenix_Experiment_Types -- Experiment tasks executed by these executors