Overview
Concrete tools for executing evaluation pipelines across entire DataFrames provided by arize-phoenix-evals, covering both synchronous and asynchronous execution paths.
Description
evaluate_dataframe() and async_evaluate_dataframe() are the top-level entry points for running a list of evaluators against every row of a pandas.DataFrame. They handle task scheduling, retry logic, error collection, progress reporting via tqdm, and result aggregation into an augmented DataFrame. The synchronous variant uses SyncExecutor for sequential processing, while the asynchronous variant uses AsyncExecutor with configurable concurrency for parallel LLM calls.
Usage
Use these functions when you need to:
- Evaluate an entire dataset in a single call rather than looping row by row.
- Leverage async concurrency for throughput-sensitive LLM evaluation workloads.
- Automatically retry transient failures (rate limits, timeouts) without manual intervention.
- Collect execution metadata (status, exceptions, timing) alongside scores for every evaluation.
Code Reference
Source Location
- Repository: Phoenix
- File:
packages/phoenix-evals/src/phoenix/evals/evaluators.py
evaluate_dataframe: lines 1377-1524
async_evaluate_dataframe: lines 1527-1702
Signature: evaluate_dataframe
def evaluate_dataframe(
dataframe: pd.DataFrame,
evaluators: List[Evaluator],
tqdm_bar_format: Optional[str] = None,
hide_tqdm_bar: bool = False,
exit_on_error: Optional[bool] = None,
max_retries: Optional[int] = None,
) -> pd.DataFrame
Signature: async_evaluate_dataframe
async def async_evaluate_dataframe(
dataframe: pd.DataFrame,
evaluators: List[Evaluator],
concurrency: Optional[int] = None,
tqdm_bar_format: Optional[str] = None,
hide_tqdm_bar: Optional[bool] = False,
exit_on_error: Optional[bool] = None,
max_retries: Optional[int] = None,
) -> pd.DataFrame
Import
from phoenix.evals import evaluate_dataframe, async_evaluate_dataframe
I/O Contract
Inputs: evaluate_dataframe
| Name |
Type |
Required |
Description
|
| dataframe |
pd.DataFrame |
Yes |
The input DataFrame to evaluate. Each row is converted to a dictionary and passed to each evaluator.
|
| evaluators |
List[Evaluator] |
Yes |
List of evaluators to apply. Input mappings should already be bound via bind_evaluator() or column names should match evaluator input fields.
|
| tqdm_bar_format |
Optional[str] |
No |
Custom format string for the tqdm progress bar. If None and hide_tqdm_bar is False, a default formatter is used.
|
| hide_tqdm_bar |
bool |
No (default False) |
Whether to suppress the progress bar entirely.
|
| exit_on_error |
Optional[bool] |
No (default True for SyncExecutor) |
Whether to abort the entire pipeline on the first unrecoverable error. Set to False for production workloads.
|
| max_retries |
Optional[int] |
No (default 10) |
Maximum number of retry attempts for each failed evaluation task.
|
Inputs: async_evaluate_dataframe (additional)
| Name |
Type |
Required |
Description
|
| concurrency |
Optional[int] |
No (default 3) |
Maximum number of evaluation tasks running concurrently. Increase for higher throughput; decrease to respect provider rate limits.
|
Outputs
| Name |
Type |
Description
|
| Augmented DataFrame |
pd.DataFrame |
A copy of the input DataFrame with additional columns: {evaluator.name}_execution_details (JSON string with status, exceptions, timing) and {score.name}_score (JSON-serialized Score dict) for each evaluator and score produced.
|
Output Column Schema
| Column Pattern |
Type |
Content
|
{evaluator.name}_execution_details |
JSON string |
"error", "exceptions": [...], "execution_time_sec": float}
|
{score.name}_score |
JSON string |
Score.to_dict() output containing name, score, label, explanation, metadata, kind, direction
|
Usage Examples
Basic Synchronous Evaluation
import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe
@create_evaluator(name="word_count")
def word_count(text: str) -> int:
return len(text.split())
@create_evaluator(name="has_question")
def has_question(text: str) -> bool:
return "?" in text
df = pd.DataFrame({
"text": [
"Hello world",
"How are you today?",
"This is a longer sentence with multiple words",
],
})
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[word_count, has_question],
hide_tqdm_bar=True,
)
print(results_df.columns.tolist())
# ['text', 'word_count_execution_details', 'has_question_execution_details',
# 'word_count_score', 'has_question_score']
Asynchronous Evaluation with LLM Evaluators
import asyncio
import pandas as pd
from phoenix.evals import (
create_classifier,
async_evaluate_dataframe,
LLM,
)
llm = LLM(provider="openai", model="gpt-4o")
sentiment = create_classifier(
name="sentiment",
prompt_template="Classify the sentiment of this text: {text}",
llm=llm,
choices=["positive", "negative", "neutral"],
)
df = pd.DataFrame({
"text": [
"I love this product!",
"This is terrible quality.",
"It is okay, nothing special.",
],
})
async def main():
results_df = await async_evaluate_dataframe(
dataframe=df,
evaluators=[sentiment],
concurrency=2,
exit_on_error=False,
max_retries=5,
)
return results_df
results_df = asyncio.run(main())
With Input Mapping and Multiple Evaluators
import pandas as pd
from phoenix.evals import (
create_evaluator,
create_classifier,
bind_evaluator,
evaluate_dataframe,
LLM,
)
llm = LLM(provider="openai", model="gpt-4o")
# LLM evaluator expects "question" and "answer"
relevance = create_classifier(
name="relevance",
prompt_template="Question: {question}\nAnswer: {answer}\nIs the answer relevant?",
llm=llm,
choices={"relevant": 1.0, "not_relevant": 0.0},
)
# Code evaluator expects "answer"
@create_evaluator(name="answer_length")
def answer_length(answer: str) -> int:
return len(answer.split())
# DataFrame uses "query" and "response" column names
df = pd.DataFrame({
"query": ["What is AI?", "Explain ML."],
"response": [
"AI is artificial intelligence.",
"ML is a subset of AI.",
],
})
# Bind both evaluators with column name mappings
bound_relevance = bind_evaluator(
evaluator=relevance,
input_mapping={"question": "query", "answer": "response"},
)
bound_length = bind_evaluator(
evaluator=answer_length,
input_mapping={"answer": "response"},
)
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[bound_relevance, bound_length],
)
Error Handling and Inspection
import json
import pandas as pd
# After running evaluate_dataframe with exit_on_error=False
for idx, row in results_df.iterrows():
details = json.loads(row["relevance_execution_details"])
if details["status"] != "success":
print(f"Row {idx} failed: {details['exceptions']}")
else:
score_data = json.loads(row["relevance_score"])
print(f"Row {idx}: label={score_data['label']}, score={score_data.get('score')}")
Related Pages
Implements Principle
Requires Environment
Uses Heuristic