Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Explodinggradients Ragas RAG Query Function Pattern

From Leeroopedia


Knowledge Sources Type Domains Last Updated
examples/ragas_examples/rag_eval/rag.py, examples/ragas_examples/improve_rag/rag.py, examples/ragas_examples/rag_eval/evals.py, examples/ragas_examples/improve_rag/evals.py Pattern Doc (user-defined interface) RAG Evaluation, Retrieval-Augmented Generation 2026-02-10

Overview

Interface specification for user-defined RAG systems that will be evaluated using the Ragas experiment framework. This pattern describes how a user implements the RAG system that gets evaluated -- specifically, the query method contract that accepts a question string and returns a structured dictionary containing the generated answer, retrieved documents, and optional metadata.

Description

The RAG Query Function Pattern is the concrete interface that any RAG system must expose in order to be plugged into a Ragas evaluation experiment. The Ragas repository provides two reference implementations of this pattern:

  • SimpleRAG (ExampleRAG class) -- A synchronous RAG system using keyword-based retrieval and OpenAI for generation. Its query() method returns {"answer": str, "run_id": str, "logs": str}.
  • Dual-mode RAG (RAG class) -- An asynchronous RAG system supporting both naive (single-pass retrieval) and agentic (agent-controlled retrieval) modes. Its query() method returns {"answer": str, "retrieved_documents": List[dict], "num_retrieved": int, "mlflow_trace_id": str}.

Both implementations follow the same structural pattern: accept a question, perform retrieval, generate a response, and return a dictionary with standardized keys.

Usage

To use this pattern, a developer:

  1. Implements a RAG class with a query(question: str, ...) -> dict method
  2. Instantiates the RAG system with the desired retriever, LLM client, and configuration
  3. Passes the instance to a Ragas @experiment()-decorated function that calls query() for each dataset row
  4. Extracts the response from the returned dictionary and feeds it to evaluation metrics

Interface Specification

The minimal interface contract for a RAG system under evaluation:

from typing import Any, Dict, List, Optional

class RAGSystem:
    """Interface that any RAG system must implement for Ragas evaluation."""

    def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:
        """
        Execute the full RAG pipeline: retrieve documents and generate a response.

        Args:
            question: The user's natural language question.
            top_k: Number of documents to retrieve (optional, default 3).

        Returns:
            Dictionary with at minimum:
                "answer": str           -- The generated response text.
                "retrieved_documents":   -- List of retrieved context dicts (optional).
                    List[Dict[str, Any]]
            Optional metadata keys:
                "run_id": str           -- Unique identifier for this query execution.
                "logs": str             -- Path to trace log file.
                "num_retrieved": int    -- Count of retrieved documents.
                "mlflow_trace_id": str  -- MLflow trace ID for observability.
        """
        ...

For asynchronous RAG systems, the equivalent async interface:

class AsyncRAGSystem:
    """Async interface variant for RAG systems."""

    async def query(self, question: str, top_k: Optional[int] = None) -> Dict[str, Any]:
        """Same contract as synchronous version, but awaitable."""
        ...

Example Implementations

Example 1: SimpleRAG (Synchronous, Keyword Retrieval)

Source: examples/ragas_examples/rag_eval/rag.py (lines 75-411)

This implementation uses a simple keyword-matching retriever and synchronous OpenAI API calls. The query() method orchestrates the full pipeline including trace logging.

class ExampleRAG:
    """
    Simple RAG system that:
    1. accepts a llm client
    2. uses simple keyword matching to retrieve relevant documents
    3. uses the llm client to generate a response based on the retrieved
       documents when a query is made
    """

    def __init__(
        self,
        llm_client,
        retriever: Optional[BaseRetriever] = None,
        system_prompt: Optional[str] = None,
        logdir: str = "logs",
    ):
        self.llm_client = llm_client
        self.retriever = retriever or SimpleKeywordRetriever()
        self.system_prompt = (
            system_prompt
            or """Answer the following question based on the provided documents:
                                Question: {query}
                                Documents:
                                {context}
                                Answer:
                            """
        )
        self.documents = []
        self.is_fitted = False
        self.traces = []
        self.logdir = logdir
        os.makedirs(self.logdir, exist_ok=True)

    def query(
        self, question: str, top_k: int = 3, run_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Complete RAG pipeline: retrieve documents and generate response.

        Returns:
            Dictionary containing response and retrieved documents:
            {"answer": str, "run_id": str, "logs": str}
        """
        if run_id is None:
            run_id = (
                f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
                f"_{hash(question) % 10000:04d}"
            )

        self.traces = []
        try:
            retrieved_docs = self.retrieve_documents(question, top_k)
            response = self.generate_response(question, top_k)
            result = {"answer": response, "run_id": run_id}
            logs_path = self.export_traces_to_log(run_id, question, result)
            return {"answer": response, "run_id": run_id, "logs": logs_path}
        except Exception as e:
            logs_path = self.export_traces_to_log(run_id, question, None)
            return {
                "answer": f"Error processing query: {str(e)}",
                "run_id": run_id,
                "logs": logs_path,
            }

Evaluation harness using SimpleRAG: (examples/ragas_examples/rag_eval/evals.py)

from ragas import Dataset, experiment
from ragas.metrics import DiscreteMetric

rag_client = default_rag_client(llm_client=openai_client, logdir="evals/logs")

my_metric = DiscreteMetric(
    name="correctness",
    prompt="Check if the response contains points mentioned from the grading "
           "notes and return 'pass' or 'fail'.\n"
           "Response: {response} Grading Notes: {grading_notes}",
    allowed_values=["pass", "fail"],
)

@experiment()
async def run_experiment(row):
    response = rag_client.query(row["question"])
    score = my_metric.score(
        llm=llm,
        response=response.get("answer", " "),
        grading_notes=row["grading_notes"],
    )
    return {
        **row,
        "response": response.get("answer", ""),
        "score": score.value,
        "log_file": response.get("logs", " "),
    }

Example 2: Dual-Mode RAG (Async, BM25, Naive/Agentic)

Source: examples/ragas_examples/improve_rag/rag.py (lines 85-214)

This implementation supports two operating modes behind the same query() interface: naive (single-pass BM25 retrieval) and agentic (agent-controlled multi-step retrieval).

class RAG:
    """RAG system that can operate in naive or agentic mode."""

    def __init__(
        self,
        llm_client: AsyncOpenAI,
        retriever: BM25Retriever,
        mode="naive",
        system_prompt=None,
        model="gpt-4o-mini",
        default_k=3,
    ):
        self.llm_client = llm_client
        self.retriever = retriever
        self.mode = mode.lower()
        self.model = model
        self.default_k = default_k
        self.system_prompt = (
            system_prompt
            or "Answer only based on documents. Be concise.\n\n"
               "Question: {query}\nDocuments:\n{context}\nAnswer:"
        )

        if self.mode == "agentic":
            self._setup_agent()

    async def query(
        self, question: str, top_k: Optional[int] = None
    ) -> Dict[str, Any]:
        """
        Query the RAG system.

        Returns:
            {
                "answer": str,
                "retrieved_documents": List[dict],
                "num_retrieved": int,
                "mlflow_trace_id": str or None
            }
        """
        if top_k is None:
            top_k = self.default_k

        try:
            if self.mode == "naive":
                return await self._naive_query(question, top_k)
            elif self.mode == "agentic":
                return await self._agentic_query(question, top_k)
            else:
                raise ValueError(f"Unknown mode: {self.mode}")
        except Exception as e:
            trace_id = (
                mlflow.get_last_active_trace_id()
                if self._mlflow_enabled
                else None
            )
            return {
                "answer": f"Error: {str(e)}",
                "retrieved_documents": [],
                "num_retrieved": 0,
                "mlflow_trace_id": trace_id,
            }

Evaluation harness using dual-mode RAG: (examples/ragas_examples/improve_rag/evals.py)

@experiment()
async def evaluate_rag(row: Dict[str, Any], rag: RAG, llm) -> Dict[str, Any]:
    question = row["question"]

    # Query the RAG system (same interface regardless of mode)
    rag_response = await rag.query(question, top_k=4)
    model_response = rag_response.get("answer", "")

    # Evaluate correctness
    score = await correctness_metric.ascore(
        question=question,
        expected_answer=row["expected_answer"],
        response=model_response,
        llm=llm,
    )

    return {
        **row,
        "model_response": model_response,
        "correctness_score": score.value,
        "correctness_reason": score.reason,
        "mlflow_trace_id": rag_response.get("mlflow_trace_id", "N/A"),
        "retrieved_documents": [
            doc.get("content", "")[:200] + "..."
            if len(doc.get("content", "")) > 200
            else doc.get("content", "")
            for doc in rag_response.get("retrieved_documents", [])
        ],
    }

Key Observations

  • Consistent return structure: Both implementations return a dictionary with an "answer" key as the primary output. Evaluation code only depends on this key.
  • Graceful error handling: Both implementations catch exceptions internally and return error messages as the "answer" value rather than raising, so the evaluation pipeline continues even if individual queries fail.
  • Trace/observability support: The SimpleRAG logs to JSON files ("logs" key), while the dual-mode RAG integrates with MLflow ("mlflow_trace_id" key). Both are optional metadata that enriches evaluation results without affecting the core contract.
  • Retriever interchangeability: The SimpleRAG accepts any BaseRetriever subclass; the dual-mode RAG uses BM25Retriever. The query() interface remains identical regardless of retriever choice.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment