Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Arize ai Phoenix Evals Executors

From Leeroopedia

Overview

The Evals Executors module provides the task execution framework for the Phoenix evaluator system. It resides at phoenix.evals.executors and implements both synchronous and asynchronous executors with dynamic concurrency control, retry mechanisms, progress reporting, and graceful termination handling. The module is responsible for orchestrating the parallel execution of evaluation tasks (typically LLM calls) while respecting rate limits and handling errors.

Description

The module contains the following key components:

ExecutionStatus (Enum)

Tracks the state of individual task execution:

Value Description
DID_NOT_RUN Task has not been executed
COMPLETED Task completed successfully on first attempt
COMPLETED_WITH_RETRIES Task completed but required one or more retries
FAILED Task failed after exhausting all retries

ExecutionDetails

A companion class that tracks per-task execution metadata: exceptions encountered, execution status, and cumulative execution time. Provides complete(), fail(), log_exception(), and log_runtime() methods.

ConcurrencyController

An AIMD (Additive Increase/Multiplicative Decrease) controller for dynamic concurrency management:

  • Feedback windows: Evaluates performance over configurable time windows (default 5 seconds).
  • Additive increase: On a clean window (no errors or timeouts), increases the target concurrency by increase_step (default 0.5).
  • Multiplicative decrease: On a window with any errors/timeouts, multiplies the target by decrease_ratio (default 0.5).
  • Collapse detection: If collapse_error_threshold errors occur within collapse_window_seconds, the target concurrency collapses to 1.
  • Smoothed latency tracking: Maintains an exponentially weighted moving average of task latency for monitoring.
  • Bounded concurrency: The target is always clamped to [1, max_concurrency].

The steady-state concurrency formula is documented as: concurrency ~= a * (1 - r_e) / ((1 - beta) * r_e), where r_e is the error fraction and a, beta are the increase step and decrease ratio.

AsyncExecutor

An asynchronous executor using a producer-consumer pattern:

  • Producer: Enqueues tasks from the input sequence into a bounded asyncio.PriorityQueue, respecting a max_fill limit to ensure room for requeued tasks.
  • Consumers: Multiple worker coroutines dequeue tasks, execute them with a timeout, and handle results. Workers implement:
    • Dynamic gating: Workers with index >= target_concurrency sleep rather than dequeue, effectively reducing active workers.
    • Timeout handling: Timed-out tasks are requeued at the same priority.
    • Retry with backoff: Failed tasks are requeued at a decremented priority (lower priority = more retries). RateLimitError tasks are explicitly logged and requeued.
    • PhoenixException bypass: Non-rate-limit PhoenixException instances bypass the retry mechanism.
    • Exit on error: If exit_on_error=True, a termination event is set on the first unrecoverable failure.
  • Signal handling: Captures SIGINT (by default) on the main thread to allow graceful shutdown.
  • Sync interface: The run() method calls asyncio.run(self.execute(inputs)).

SyncExecutor

A sequential executor for synchronous evaluation:

  • Iterates over inputs sequentially with retry logic.
  • Uses signal.SIGINT handling for interruptibility.
  • PhoenixException instances bypass retries immediately.
  • Provides a tqdm progress bar.

get_executor_on_sync_context()

A factory function that selects the appropriate executor based on the runtime context:

Condition Executor Chosen
Non-main thread SyncExecutor (with warning if async was requested)
run_sync=True SyncExecutor
Running event loop + nest_asyncio patched AsyncExecutor
Running event loop + no nest_asyncio SyncExecutor (with warning to patch)
No running event loop AsyncExecutor

Usage

from phoenix.evals.executors import AsyncExecutor

async def generate(input_data):
    """An async generation function for each evaluation task."""
    return await llm_adapter.async_generate_text(input_data["prompt"])

executor = AsyncExecutor(
    generation_fn=generate,
    concurrency=5,
    max_retries=3,
    exit_on_error=False,
    tqdm_bar_format="Evaluating |{bar}| {n_fmt}/{total_fmt}",
    timeout=60,
)

outputs, details = executor.run(inputs)
from phoenix.evals.executors import SyncExecutor

def generate_sync(input_data):
    return llm_adapter.generate_text(input_data["prompt"])

executor = SyncExecutor(
    generation_fn=generate_sync,
    max_retries=3,
    exit_on_error=True,
)

outputs, details = executor.run(inputs)
from phoenix.evals.executors import get_executor_on_sync_context

# Automatically selects async or sync executor based on context
executor = get_executor_on_sync_context(
    sync_fn=generate_sync,
    async_fn=generate_async,
    concurrency=10,
    max_retries=5,
)
outputs, details = executor.run(inputs)

Code Reference

Symbol Kind Location Lines
ExecutionStatus Enum packages/phoenix-evals/src/phoenix/evals/executors.py 40-44
ExecutionDetails Class packages/phoenix-evals/src/phoenix/evals/executors.py 47-66
ConcurrencyController Class packages/phoenix-evals/src/phoenix/evals/executors.py 73-168
AsyncExecutor Class packages/phoenix-evals/src/phoenix/evals/executors.py 170-453
AsyncExecutor.producer() Method packages/phoenix-evals/src/phoenix/evals/executors.py 239-256
AsyncExecutor.consumer() Method packages/phoenix-evals/src/phoenix/evals/executors.py 258-375
AsyncExecutor.execute() Method packages/phoenix-evals/src/phoenix/evals/executors.py 377-450
SyncExecutor Class packages/phoenix-evals/src/phoenix/evals/executors.py 456-562
SyncExecutor.run() Method packages/phoenix-evals/src/phoenix/evals/executors.py 511-562
get_executor_on_sync_context() Function packages/phoenix-evals/src/phoenix/evals/executors.py 565-634

I/O Contract

Executor.run() (both AsyncExecutor and SyncExecutor)

Direction Type Description
Input Sequence[Any] A sequence of input items to process. Each item is passed individually to the generation function.
Output Tuple[List[Any], List[ExecutionDetails]] A tuple of (outputs, details). outputs[i] is the result for inputs[i] (or the fallback value if failed). details[i] contains execution metadata.

get_executor_on_sync_context()

Direction Type Description
Input Callable[[Any], Any] Synchronous generation function
Input Callable[[Any], Coroutine] Asynchronous generation function
Input bool run_sync: force synchronous execution
Input int concurrency: max concurrent workers (default 3)
Input int max_retries: max retry attempts (default 10)
Input bool exit_on_error: stop on first error (default True)
Output Executor An AsyncExecutor or SyncExecutor instance

ConcurrencyController Constructor Parameters

Parameter Type Default Description
max_concurrency int (required) Upper bound on concurrent workers
initial_target float (required) Starting concurrency target
window_seconds float 5.0 Duration of feedback windows
increase_step float 0.5 Additive increase per clean window
decrease_ratio float 0.5 Multiplicative decrease on error windows
collapse_window_seconds float 15.0 Window for collapse detection
collapse_error_threshold int 2 Error count triggering collapse to concurrency=1

Usage Examples

Dynamic Concurrency in Action

# The AsyncExecutor with dynamic concurrency enabled (default):
# - Starts at concurrency=5
# - If rate limit errors occur, controller reduces active workers
# - On recovery, workers are gradually reactivated

executor = AsyncExecutor(
    generation_fn=generate,
    concurrency=10,
    enable_dynamic_concurrency=True,
    dynamic_initial_target=5,  # Start with 5 active workers
    dynamic_window_seconds=5.0,
    dynamic_increase_step=1,
    dynamic_decrease_ratio=0.5,
)

Error Handling and Retries

# AsyncExecutor retry behavior:
# - Tasks are requeued with decremented priority (priority - 1)
# - After max_retries exhausted: task is marked FAILED
# - RateLimitError: requeued with error recorded to controller
# - PhoenixException (non-rate-limit): fails immediately, no retries
# - Timeout: requeued at same priority (not counted as retry)

executor = AsyncExecutor(
    generation_fn=generate,
    max_retries=5,
    exit_on_error=False,  # Continue processing other inputs on failure
    timeout=30,
    fallback_return_value=None,  # Return None for failed tasks
)

outputs, details = executor.run(inputs)
for i, detail in enumerate(details):
    if detail.status == ExecutionStatus.FAILED:
        print(f"Task {i} failed after {len(detail.exceptions)} exceptions")

Notebook-Aware Executor Selection

# In Jupyter notebooks, get_executor_on_sync_context() detects the running event loop
# and uses nest_asyncio if available for async execution.
# Without nest_asyncio, it falls back to sync with a helpful warning.

import nest_asyncio
nest_asyncio.apply()

executor = get_executor_on_sync_context(
    sync_fn=sync_generate,
    async_fn=async_generate,
    concurrency=10,
)
# Returns AsyncExecutor since nest_asyncio is patched

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment