Implementation:Explodinggradients Ragas RAG Query Function Pattern
| Knowledge Sources | Type | Domains | Last Updated |
|---|---|---|---|
examples/ragas_examples/rag_eval/rag.py, examples/ragas_examples/improve_rag/rag.py, examples/ragas_examples/rag_eval/evals.py, examples/ragas_examples/improve_rag/evals.py |
Pattern Doc (user-defined interface) | RAG Evaluation, Retrieval-Augmented Generation | 2026-02-10 |
Overview
Interface specification for user-defined RAG systems that will be evaluated using the Ragas experiment framework. This pattern describes how a user implements the RAG system that gets evaluated -- specifically, the query method contract that accepts a question string and returns a structured dictionary containing the generated answer, retrieved documents, and optional metadata.
Description
The RAG Query Function Pattern is the concrete interface that any RAG system must expose in order to be plugged into a Ragas evaluation experiment. The Ragas repository provides two reference implementations of this pattern:
- SimpleRAG (
ExampleRAGclass) -- A synchronous RAG system using keyword-based retrieval and OpenAI for generation. Itsquery()method returns{"answer": str, "run_id": str, "logs": str}. - Dual-mode RAG (
RAGclass) -- An asynchronous RAG system supporting both naive (single-pass retrieval) and agentic (agent-controlled retrieval) modes. Itsquery()method returns{"answer": str, "retrieved_documents": List[dict], "num_retrieved": int, "mlflow_trace_id": str}.
Both implementations follow the same structural pattern: accept a question, perform retrieval, generate a response, and return a dictionary with standardized keys.
Usage
To use this pattern, a developer:
- Implements a RAG class with a
query(question: str, ...) -> dictmethod - Instantiates the RAG system with the desired retriever, LLM client, and configuration
- Passes the instance to a Ragas
@experiment()-decorated function that callsquery()for each dataset row - Extracts the response from the returned dictionary and feeds it to evaluation metrics
Interface Specification
The minimal interface contract for a RAG system under evaluation:
from typing import Any, Dict, List, Optional
class RAGSystem:
"""Interface that any RAG system must implement for Ragas evaluation."""
def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:
"""
Execute the full RAG pipeline: retrieve documents and generate a response.
Args:
question: The user's natural language question.
top_k: Number of documents to retrieve (optional, default 3).
Returns:
Dictionary with at minimum:
"answer": str -- The generated response text.
"retrieved_documents": -- List of retrieved context dicts (optional).
List[Dict[str, Any]]
Optional metadata keys:
"run_id": str -- Unique identifier for this query execution.
"logs": str -- Path to trace log file.
"num_retrieved": int -- Count of retrieved documents.
"mlflow_trace_id": str -- MLflow trace ID for observability.
"""
...
For asynchronous RAG systems, the equivalent async interface:
class AsyncRAGSystem:
"""Async interface variant for RAG systems."""
async def query(self, question: str, top_k: Optional[int] = None) -> Dict[str, Any]:
"""Same contract as synchronous version, but awaitable."""
...
Example Implementations
Example 1: SimpleRAG (Synchronous, Keyword Retrieval)
Source: examples/ragas_examples/rag_eval/rag.py (lines 75-411)
This implementation uses a simple keyword-matching retriever and synchronous OpenAI API calls. The query() method orchestrates the full pipeline including trace logging.
class ExampleRAG:
"""
Simple RAG system that:
1. accepts a llm client
2. uses simple keyword matching to retrieve relevant documents
3. uses the llm client to generate a response based on the retrieved
documents when a query is made
"""
def __init__(
self,
llm_client,
retriever: Optional[BaseRetriever] = None,
system_prompt: Optional[str] = None,
logdir: str = "logs",
):
self.llm_client = llm_client
self.retriever = retriever or SimpleKeywordRetriever()
self.system_prompt = (
system_prompt
or """Answer the following question based on the provided documents:
Question: {query}
Documents:
{context}
Answer:
"""
)
self.documents = []
self.is_fitted = False
self.traces = []
self.logdir = logdir
os.makedirs(self.logdir, exist_ok=True)
def query(
self, question: str, top_k: int = 3, run_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Complete RAG pipeline: retrieve documents and generate response.
Returns:
Dictionary containing response and retrieved documents:
{"answer": str, "run_id": str, "logs": str}
"""
if run_id is None:
run_id = (
f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
f"_{hash(question) % 10000:04d}"
)
self.traces = []
try:
retrieved_docs = self.retrieve_documents(question, top_k)
response = self.generate_response(question, top_k)
result = {"answer": response, "run_id": run_id}
logs_path = self.export_traces_to_log(run_id, question, result)
return {"answer": response, "run_id": run_id, "logs": logs_path}
except Exception as e:
logs_path = self.export_traces_to_log(run_id, question, None)
return {
"answer": f"Error processing query: {str(e)}",
"run_id": run_id,
"logs": logs_path,
}
Evaluation harness using SimpleRAG: (examples/ragas_examples/rag_eval/evals.py)
from ragas import Dataset, experiment
from ragas.metrics import DiscreteMetric
rag_client = default_rag_client(llm_client=openai_client, logdir="evals/logs")
my_metric = DiscreteMetric(
name="correctness",
prompt="Check if the response contains points mentioned from the grading "
"notes and return 'pass' or 'fail'.\n"
"Response: {response} Grading Notes: {grading_notes}",
allowed_values=["pass", "fail"],
)
@experiment()
async def run_experiment(row):
response = rag_client.query(row["question"])
score = my_metric.score(
llm=llm,
response=response.get("answer", " "),
grading_notes=row["grading_notes"],
)
return {
**row,
"response": response.get("answer", ""),
"score": score.value,
"log_file": response.get("logs", " "),
}
Example 2: Dual-Mode RAG (Async, BM25, Naive/Agentic)
Source: examples/ragas_examples/improve_rag/rag.py (lines 85-214)
This implementation supports two operating modes behind the same query() interface: naive (single-pass BM25 retrieval) and agentic (agent-controlled multi-step retrieval).
class RAG:
"""RAG system that can operate in naive or agentic mode."""
def __init__(
self,
llm_client: AsyncOpenAI,
retriever: BM25Retriever,
mode="naive",
system_prompt=None,
model="gpt-4o-mini",
default_k=3,
):
self.llm_client = llm_client
self.retriever = retriever
self.mode = mode.lower()
self.model = model
self.default_k = default_k
self.system_prompt = (
system_prompt
or "Answer only based on documents. Be concise.\n\n"
"Question: {query}\nDocuments:\n{context}\nAnswer:"
)
if self.mode == "agentic":
self._setup_agent()
async def query(
self, question: str, top_k: Optional[int] = None
) -> Dict[str, Any]:
"""
Query the RAG system.
Returns:
{
"answer": str,
"retrieved_documents": List[dict],
"num_retrieved": int,
"mlflow_trace_id": str or None
}
"""
if top_k is None:
top_k = self.default_k
try:
if self.mode == "naive":
return await self._naive_query(question, top_k)
elif self.mode == "agentic":
return await self._agentic_query(question, top_k)
else:
raise ValueError(f"Unknown mode: {self.mode}")
except Exception as e:
trace_id = (
mlflow.get_last_active_trace_id()
if self._mlflow_enabled
else None
)
return {
"answer": f"Error: {str(e)}",
"retrieved_documents": [],
"num_retrieved": 0,
"mlflow_trace_id": trace_id,
}
Evaluation harness using dual-mode RAG: (examples/ragas_examples/improve_rag/evals.py)
@experiment()
async def evaluate_rag(row: Dict[str, Any], rag: RAG, llm) -> Dict[str, Any]:
question = row["question"]
# Query the RAG system (same interface regardless of mode)
rag_response = await rag.query(question, top_k=4)
model_response = rag_response.get("answer", "")
# Evaluate correctness
score = await correctness_metric.ascore(
question=question,
expected_answer=row["expected_answer"],
response=model_response,
llm=llm,
)
return {
**row,
"model_response": model_response,
"correctness_score": score.value,
"correctness_reason": score.reason,
"mlflow_trace_id": rag_response.get("mlflow_trace_id", "N/A"),
"retrieved_documents": [
doc.get("content", "")[:200] + "..."
if len(doc.get("content", "")) > 200
else doc.get("content", "")
for doc in rag_response.get("retrieved_documents", [])
],
}
Key Observations
- Consistent return structure: Both implementations return a dictionary with an
"answer"key as the primary output. Evaluation code only depends on this key. - Graceful error handling: Both implementations catch exceptions internally and return error messages as the
"answer"value rather than raising, so the evaluation pipeline continues even if individual queries fail. - Trace/observability support: The SimpleRAG logs to JSON files (
"logs"key), while the dual-mode RAG integrates with MLflow ("mlflow_trace_id"key). Both are optional metadata that enriches evaluation results without affecting the core contract. - Retriever interchangeability: The SimpleRAG accepts any
BaseRetrieversubclass; the dual-mode RAG usesBM25Retriever. Thequery()interface remains identical regardless of retriever choice.