Implementation:Arize ai Phoenix Evals Executors
Overview
The Evals Executors module provides the task execution framework for the Phoenix evaluator system. It resides at phoenix.evals.executors and implements both synchronous and asynchronous executors with dynamic concurrency control, retry mechanisms, progress reporting, and graceful termination handling. The module is responsible for orchestrating the parallel execution of evaluation tasks (typically LLM calls) while respecting rate limits and handling errors.
Description
The module contains the following key components:
ExecutionStatus (Enum)
Tracks the state of individual task execution:
| Value | Description |
|---|---|
DID_NOT_RUN |
Task has not been executed |
COMPLETED |
Task completed successfully on first attempt |
COMPLETED_WITH_RETRIES |
Task completed but required one or more retries |
FAILED |
Task failed after exhausting all retries |
ExecutionDetails
A companion class that tracks per-task execution metadata: exceptions encountered, execution status, and cumulative execution time. Provides complete(), fail(), log_exception(), and log_runtime() methods.
ConcurrencyController
An AIMD (Additive Increase/Multiplicative Decrease) controller for dynamic concurrency management:
- Feedback windows: Evaluates performance over configurable time windows (default 5 seconds).
- Additive increase: On a clean window (no errors or timeouts), increases the target concurrency by
increase_step(default 0.5). - Multiplicative decrease: On a window with any errors/timeouts, multiplies the target by
decrease_ratio(default 0.5). - Collapse detection: If
collapse_error_thresholderrors occur withincollapse_window_seconds, the target concurrency collapses to 1. - Smoothed latency tracking: Maintains an exponentially weighted moving average of task latency for monitoring.
- Bounded concurrency: The target is always clamped to
[1, max_concurrency].
The steady-state concurrency formula is documented as: concurrency ~= a * (1 - r_e) / ((1 - beta) * r_e), where r_e is the error fraction and a, beta are the increase step and decrease ratio.
AsyncExecutor
An asynchronous executor using a producer-consumer pattern:
- Producer: Enqueues tasks from the input sequence into a bounded
asyncio.PriorityQueue, respecting amax_filllimit to ensure room for requeued tasks. - Consumers: Multiple worker coroutines dequeue tasks, execute them with a timeout, and handle results. Workers implement:
- Dynamic gating: Workers with index >=
target_concurrencysleep rather than dequeue, effectively reducing active workers. - Timeout handling: Timed-out tasks are requeued at the same priority.
- Retry with backoff: Failed tasks are requeued at a decremented priority (lower priority = more retries).
RateLimitErrortasks are explicitly logged and requeued. - PhoenixException bypass: Non-rate-limit
PhoenixExceptioninstances bypass the retry mechanism. - Exit on error: If
exit_on_error=True, a termination event is set on the first unrecoverable failure.
- Dynamic gating: Workers with index >=
- Signal handling: Captures
SIGINT(by default) on the main thread to allow graceful shutdown. - Sync interface: The
run()method callsasyncio.run(self.execute(inputs)).
SyncExecutor
A sequential executor for synchronous evaluation:
- Iterates over inputs sequentially with retry logic.
- Uses
signal.SIGINThandling for interruptibility. PhoenixExceptioninstances bypass retries immediately.- Provides a tqdm progress bar.
get_executor_on_sync_context()
A factory function that selects the appropriate executor based on the runtime context:
| Condition | Executor Chosen |
|---|---|
| Non-main thread | SyncExecutor (with warning if async was requested)
|
run_sync=True |
SyncExecutor
|
Running event loop + nest_asyncio patched |
AsyncExecutor
|
Running event loop + no nest_asyncio |
SyncExecutor (with warning to patch)
|
| No running event loop | AsyncExecutor
|
Usage
from phoenix.evals.executors import AsyncExecutor
async def generate(input_data):
"""An async generation function for each evaluation task."""
return await llm_adapter.async_generate_text(input_data["prompt"])
executor = AsyncExecutor(
generation_fn=generate,
concurrency=5,
max_retries=3,
exit_on_error=False,
tqdm_bar_format="Evaluating |{bar}| {n_fmt}/{total_fmt}",
timeout=60,
)
outputs, details = executor.run(inputs)
from phoenix.evals.executors import SyncExecutor
def generate_sync(input_data):
return llm_adapter.generate_text(input_data["prompt"])
executor = SyncExecutor(
generation_fn=generate_sync,
max_retries=3,
exit_on_error=True,
)
outputs, details = executor.run(inputs)
from phoenix.evals.executors import get_executor_on_sync_context
# Automatically selects async or sync executor based on context
executor = get_executor_on_sync_context(
sync_fn=generate_sync,
async_fn=generate_async,
concurrency=10,
max_retries=5,
)
outputs, details = executor.run(inputs)
Code Reference
| Symbol | Kind | Location | Lines |
|---|---|---|---|
ExecutionStatus |
Enum | packages/phoenix-evals/src/phoenix/evals/executors.py | 40-44 |
ExecutionDetails |
Class | packages/phoenix-evals/src/phoenix/evals/executors.py | 47-66 |
ConcurrencyController |
Class | packages/phoenix-evals/src/phoenix/evals/executors.py | 73-168 |
AsyncExecutor |
Class | packages/phoenix-evals/src/phoenix/evals/executors.py | 170-453 |
AsyncExecutor.producer() |
Method | packages/phoenix-evals/src/phoenix/evals/executors.py | 239-256 |
AsyncExecutor.consumer() |
Method | packages/phoenix-evals/src/phoenix/evals/executors.py | 258-375 |
AsyncExecutor.execute() |
Method | packages/phoenix-evals/src/phoenix/evals/executors.py | 377-450 |
SyncExecutor |
Class | packages/phoenix-evals/src/phoenix/evals/executors.py | 456-562 |
SyncExecutor.run() |
Method | packages/phoenix-evals/src/phoenix/evals/executors.py | 511-562 |
get_executor_on_sync_context() |
Function | packages/phoenix-evals/src/phoenix/evals/executors.py | 565-634 |
I/O Contract
Executor.run() (both AsyncExecutor and SyncExecutor)
| Direction | Type | Description |
|---|---|---|
| Input | Sequence[Any] |
A sequence of input items to process. Each item is passed individually to the generation function. |
| Output | Tuple[List[Any], List[ExecutionDetails]] |
A tuple of (outputs, details). outputs[i] is the result for inputs[i] (or the fallback value if failed). details[i] contains execution metadata.
|
get_executor_on_sync_context()
| Direction | Type | Description |
|---|---|---|
| Input | Callable[[Any], Any] |
Synchronous generation function |
| Input | Callable[[Any], Coroutine] |
Asynchronous generation function |
| Input | bool |
run_sync: force synchronous execution
|
| Input | int |
concurrency: max concurrent workers (default 3)
|
| Input | int |
max_retries: max retry attempts (default 10)
|
| Input | bool |
exit_on_error: stop on first error (default True)
|
| Output | Executor |
An AsyncExecutor or SyncExecutor instance
|
ConcurrencyController Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
max_concurrency |
int |
(required) | Upper bound on concurrent workers |
initial_target |
float |
(required) | Starting concurrency target |
window_seconds |
float |
5.0 |
Duration of feedback windows |
increase_step |
float |
0.5 |
Additive increase per clean window |
decrease_ratio |
float |
0.5 |
Multiplicative decrease on error windows |
collapse_window_seconds |
float |
15.0 |
Window for collapse detection |
collapse_error_threshold |
int |
2 |
Error count triggering collapse to concurrency=1 |
Usage Examples
Dynamic Concurrency in Action
# The AsyncExecutor with dynamic concurrency enabled (default):
# - Starts at concurrency=5
# - If rate limit errors occur, controller reduces active workers
# - On recovery, workers are gradually reactivated
executor = AsyncExecutor(
generation_fn=generate,
concurrency=10,
enable_dynamic_concurrency=True,
dynamic_initial_target=5, # Start with 5 active workers
dynamic_window_seconds=5.0,
dynamic_increase_step=1,
dynamic_decrease_ratio=0.5,
)
Error Handling and Retries
# AsyncExecutor retry behavior:
# - Tasks are requeued with decremented priority (priority - 1)
# - After max_retries exhausted: task is marked FAILED
# - RateLimitError: requeued with error recorded to controller
# - PhoenixException (non-rate-limit): fails immediately, no retries
# - Timeout: requeued at same priority (not counted as retry)
executor = AsyncExecutor(
generation_fn=generate,
max_retries=5,
exit_on_error=False, # Continue processing other inputs on failure
timeout=30,
fallback_return_value=None, # Return None for failed tasks
)
outputs, details = executor.run(inputs)
for i, detail in enumerate(details):
if detail.status == ExecutionStatus.FAILED:
print(f"Task {i} failed after {len(detail.exceptions)} exceptions")
Notebook-Aware Executor Selection
# In Jupyter notebooks, get_executor_on_sync_context() detects the running event loop
# and uses nest_asyncio if available for async execution.
# Without nest_asyncio, it falls back to sync with a helpful warning.
import nest_asyncio
nest_asyncio.apply()
executor = get_executor_on_sync_context(
sync_fn=sync_generate,
async_fn=async_generate,
concurrency=10,
)
# Returns AsyncExecutor since nest_asyncio is patched
Related Pages
- Arize_ai_Phoenix_Evals_Rate_Limiters - Rate limiting subsystem whose
RateLimitErrortriggers executor retry logic - Arize_ai_Phoenix_LLM_Base_Types - Adapter methods that serve as generation functions for executors
- Arize_ai_Phoenix_Anthropic_Adapter - Adapter whose
generate_text()/async_generate_text()are wrapped by executors - Arize_ai_Phoenix_OpenAI_Adapter - Adapter whose methods are wrapped by executors
- Arize_ai_Phoenix_Evals_Tracing - Tracing decorators applied to evaluation functions before executor wrapping
- Arize_ai_Phoenix_Evals_Utils - Utility functions for processing executor outputs into annotation dataframes