Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Arize ai Phoenix Evaluate Dataframe

From Leeroopedia
Knowledge Sources
Domains LLM Evaluation, Pipeline Orchestration, Batch Processing
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tools for executing evaluation pipelines across entire DataFrames provided by arize-phoenix-evals, covering both synchronous and asynchronous execution paths.

Description

evaluate_dataframe() and async_evaluate_dataframe() are the top-level entry points for running a list of evaluators against every row of a pandas.DataFrame. They handle task scheduling, retry logic, error collection, progress reporting via tqdm, and result aggregation into an augmented DataFrame. The synchronous variant uses SyncExecutor for sequential processing, while the asynchronous variant uses AsyncExecutor with configurable concurrency for parallel LLM calls.

Usage

Use these functions when you need to:

  • Evaluate an entire dataset in a single call rather than looping row by row.
  • Leverage async concurrency for throughput-sensitive LLM evaluation workloads.
  • Automatically retry transient failures (rate limits, timeouts) without manual intervention.
  • Collect execution metadata (status, exceptions, timing) alongside scores for every evaluation.

Code Reference

Source Location

  • Repository: Phoenix
  • File: packages/phoenix-evals/src/phoenix/evals/evaluators.py
    • evaluate_dataframe: lines 1377-1524
    • async_evaluate_dataframe: lines 1527-1702

Signature: evaluate_dataframe

def evaluate_dataframe(
    dataframe: pd.DataFrame,
    evaluators: List[Evaluator],
    tqdm_bar_format: Optional[str] = None,
    hide_tqdm_bar: bool = False,
    exit_on_error: Optional[bool] = None,
    max_retries: Optional[int] = None,
) -> pd.DataFrame

Signature: async_evaluate_dataframe

async def async_evaluate_dataframe(
    dataframe: pd.DataFrame,
    evaluators: List[Evaluator],
    concurrency: Optional[int] = None,
    tqdm_bar_format: Optional[str] = None,
    hide_tqdm_bar: Optional[bool] = False,
    exit_on_error: Optional[bool] = None,
    max_retries: Optional[int] = None,
) -> pd.DataFrame

Import

from phoenix.evals import evaluate_dataframe, async_evaluate_dataframe

I/O Contract

Inputs: evaluate_dataframe

Name Type Required Description
dataframe pd.DataFrame Yes The input DataFrame to evaluate. Each row is converted to a dictionary and passed to each evaluator.
evaluators List[Evaluator] Yes List of evaluators to apply. Input mappings should already be bound via bind_evaluator() or column names should match evaluator input fields.
tqdm_bar_format Optional[str] No Custom format string for the tqdm progress bar. If None and hide_tqdm_bar is False, a default formatter is used.
hide_tqdm_bar bool No (default False) Whether to suppress the progress bar entirely.
exit_on_error Optional[bool] No (default True for SyncExecutor) Whether to abort the entire pipeline on the first unrecoverable error. Set to False for production workloads.
max_retries Optional[int] No (default 10) Maximum number of retry attempts for each failed evaluation task.

Inputs: async_evaluate_dataframe (additional)

Name Type Required Description
concurrency Optional[int] No (default 3) Maximum number of evaluation tasks running concurrently. Increase for higher throughput; decrease to respect provider rate limits.

Outputs

Name Type Description
Augmented DataFrame pd.DataFrame A copy of the input DataFrame with additional columns: {evaluator.name}_execution_details (JSON string with status, exceptions, timing) and {score.name}_score (JSON-serialized Score dict) for each evaluator and score produced.

Output Column Schema

Column Pattern Type Content
{evaluator.name}_execution_details JSON string "error", "exceptions": [...], "execution_time_sec": float}
{score.name}_score JSON string Score.to_dict() output containing name, score, label, explanation, metadata, kind, direction

Usage Examples

Basic Synchronous Evaluation

import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe

@create_evaluator(name="word_count")
def word_count(text: str) -> int:
    return len(text.split())

@create_evaluator(name="has_question")
def has_question(text: str) -> bool:
    return "?" in text

df = pd.DataFrame({
    "text": [
        "Hello world",
        "How are you today?",
        "This is a longer sentence with multiple words",
    ],
})

results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[word_count, has_question],
    hide_tqdm_bar=True,
)

print(results_df.columns.tolist())
# ['text', 'word_count_execution_details', 'has_question_execution_details',
#  'word_count_score', 'has_question_score']

Asynchronous Evaluation with LLM Evaluators

import asyncio
import pandas as pd
from phoenix.evals import (
    create_classifier,
    async_evaluate_dataframe,
    LLM,
)

llm = LLM(provider="openai", model="gpt-4o")

sentiment = create_classifier(
    name="sentiment",
    prompt_template="Classify the sentiment of this text: {text}",
    llm=llm,
    choices=["positive", "negative", "neutral"],
)

df = pd.DataFrame({
    "text": [
        "I love this product!",
        "This is terrible quality.",
        "It is okay, nothing special.",
    ],
})

async def main():
    results_df = await async_evaluate_dataframe(
        dataframe=df,
        evaluators=[sentiment],
        concurrency=2,
        exit_on_error=False,
        max_retries=5,
    )
    return results_df

results_df = asyncio.run(main())

With Input Mapping and Multiple Evaluators

import pandas as pd
from phoenix.evals import (
    create_evaluator,
    create_classifier,
    bind_evaluator,
    evaluate_dataframe,
    LLM,
)

llm = LLM(provider="openai", model="gpt-4o")

# LLM evaluator expects "question" and "answer"
relevance = create_classifier(
    name="relevance",
    prompt_template="Question: {question}\nAnswer: {answer}\nIs the answer relevant?",
    llm=llm,
    choices={"relevant": 1.0, "not_relevant": 0.0},
)

# Code evaluator expects "answer"
@create_evaluator(name="answer_length")
def answer_length(answer: str) -> int:
    return len(answer.split())

# DataFrame uses "query" and "response" column names
df = pd.DataFrame({
    "query": ["What is AI?", "Explain ML."],
    "response": [
        "AI is artificial intelligence.",
        "ML is a subset of AI.",
    ],
})

# Bind both evaluators with column name mappings
bound_relevance = bind_evaluator(
    evaluator=relevance,
    input_mapping={"question": "query", "answer": "response"},
)
bound_length = bind_evaluator(
    evaluator=answer_length,
    input_mapping={"answer": "response"},
)

results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[bound_relevance, bound_length],
)

Error Handling and Inspection

import json
import pandas as pd

# After running evaluate_dataframe with exit_on_error=False
for idx, row in results_df.iterrows():
    details = json.loads(row["relevance_execution_details"])
    if details["status"] != "success":
        print(f"Row {idx} failed: {details['exceptions']}")
    else:
        score_data = json.loads(row["relevance_score"])
        print(f"Row {idx}: label={score_data['label']}, score={score_data.get('score')}")

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment