Implementation:FlagOpen FlagEmbedding Reinforced IR Data Utils
| Knowledge Sources | |
|---|---|
| Domains | Natural Language Processing, Information Retrieval, Machine Learning |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A comprehensive utility module for data generation, evaluation, and training data preparation in reinforced information retrieval systems.
Description
This module provides essential functions for building training data for embedding models through mining hard negatives, generating distillation data from language models, and evaluating retrieval performance. Key capabilities include:
- Hard Negative Mining: Uses dense retrievers to mine challenging negative passages from large corpora using FAISS for efficient similarity search
- LLM Distillation: Generates preference pairs (chosen/rejected) for DPO training by comparing relevance scores from multiple query formulations
- Evaluation Metrics: Computes MRR, Recall@k, NDCG@k, MAP@k, and Precision@k using pytrec_eval
- FAISS Integration: GPU-accelerated dense retrieval with sharded indexing across multiple GPUs
The module implements sophisticated negative sampling strategies including hard negatives (similar but incorrect), random negatives, and score-based filtering to create high-quality training data for embedding models.
Usage
Use this module when training embedding models for retrieval tasks, particularly when you need to generate synthetic training data from unlabeled corpora, distill knowledge from large language models into smaller embedding models, or evaluate retrieval system performance with standard IR metrics.
Code Reference
Source Location
- Repository: FlagOpen_FlagEmbedding
- File: research/Reinforced_IR/data_generation/utils.py
- Lines: 1-474
Signature
def generate_bge_train_data(
retrieval_model,
batch_size: int = 512,
max_length: int = 512,
queries_corpus: Union[List[dict], List[List[dict]]] = None,
dtype: str = 'passage',
corpus: List[str] = None,
filter_data: bool = False,
filter_num: int = 20,
emb_save_path: str = None,
ignore_prefix: bool = False,
neg_type: str = 'hard'
) -> List[dict]:
"""Generate training data by mining hard negatives using dense retrieval"""
def generate_llm_dpo_train_data(
queries_corpus_list: List[List[dict]] = None,
search_dtype: str = 'answer',
result_dtype: str = 'passage',
retrieval_model = None,
threshold: float = 0.95,
batch_size: int = 512,
max_length: int = 1024,
use_rule1: bool = True
) -> List[dict]:
"""Generate DPO training data by comparing multiple query formulations"""
def evaluate(
metrics: List[str] = ['recall', 'mrr', 'ndcg'],
k_values: List[int] = [1, 10],
ground_truths: List[Dict] = None,
predicts: List = None,
scores: List = None
) -> dict:
"""Compute retrieval evaluation metrics"""
def search(queries_emb, doc_emb, topk: int = 100) -> Tuple:
"""Perform FAISS-based dense retrieval across GPUs"""
Import
from research.Reinforced_IR.data_generation.utils import (
generate_bge_train_data,
generate_llm_dpo_train_data,
get_distill_data,
evaluate,
evaluate_better,
search,
evaluate_mrr,
extract_numbers
)
I/O Contract
Inputs (generate_bge_train_data)
| Name | Type | Required | Description |
|---|---|---|---|
| retrieval_model | object | Yes | Model with encode_queries/encode_corpus methods |
| queries_corpus | List[dict] | Yes | Dicts with 'query', 'answer', 'passage' keys |
| batch_size | int | No | Batch size for encoding (default: 512) |
| max_length | int | No | Max sequence length (default: 512) |
| neg_type | str | No | Negative type: 'hard', 'random', or 'mixed' (default: 'hard') |
| filter_data | bool | No | Filter by retrieval rank (default: False) |
| filter_num | int | No | Keep only if positive rank < filter_num (default: 20) |
Outputs (generate_bge_train_data)
| Name | Type | Description |
|---|---|---|
| train_data | List[dict] | Training samples with query, answer, pos, neg, neg_answer fields |
Training Data Format
{
'query': str, # The search query
'answer': str, # The expected answer/response
'pos': [str], # List of 1 positive passage
'neg': [str], # List of 15 hard negative passages
'neg_answer': [str] # List of 15 negative answers
}
Key Functions
Hard Negative Mining
The generate_bge_train_data function implements sophisticated hard negative mining:
1. Encode Queries and Documents: Separately encodes queries (optionally blended with answers) and corpus 2. FAISS Retrieval: Retrieves top-2000 candidates per query using GPU-accelerated search 3. Negative Selection:
* Finds where the positive passage ranks * Samples negatives from passages with scores ≤ 0.95 × positive score * Falls back to sampling from ranks 30-200 if positive not found
4. Deduplication: Removes duplicates and keeps 15 unique negatives 5. Optional Filtering: Discards queries where positive doesn't rank in top-N
LLM Distillation for DPO
The generate_llm_dpo_train_data function creates preference pairs:
1. Multiple Formulations: Compares different query/answer formulations for same passage 2. Score Calculation: Computes retrieval scores for each formulation 3. Pair Selection: Creates (chosen, rejected) pairs where:
* Chosen: formulation with highest score * Rejected: formulation with lowest score * Requires sufficient score gap (threshold × score_range)
4. Optional Rule: Can require chosen score > raw query score
Evaluation Metrics
Implements standard TREC-style evaluation:
- MRR (Mean Reciprocal Rank): Averaged reciprocal of first relevant result rank
- Recall@k: Proportion of relevant docs in top-k
- NDCG@k: Normalized discounted cumulative gain
- MAP@k: Mean average precision at cutoff k
- Precision@k: Fraction of top-k that are relevant
Usage Examples
Mine Hard Negatives
from FlagEmbedding import FlagModel
from research.Reinforced_IR.data_generation.utils import generate_bge_train_data
# Load retrieval model
model = FlagModel('BAAI/bge-base-en-v1.5', use_fp16=True)
# Prepare query-passage data
data = [
{
'query': 'what is machine learning',
'answer': 'Machine learning is a subset of AI...',
'passage': 'Machine learning (ML) is a field of study...'
},
# ... more examples
]
# Generate training data with hard negatives
train_data = generate_bge_train_data(
retrieval_model=model,
queries_corpus=data,
batch_size=256,
neg_type='hard',
filter_data=True,
filter_num=20 # Keep only if positive ranks in top 20
)
# train_data now contains query + 1 pos + 15 hard negs per sample
print(f"Generated {len(train_data)} training samples")
Generate DPO Training Data
from research.Reinforced_IR.data_generation.utils import generate_llm_dpo_train_data
# Multiple query formulations for each passage
formulation_list = [
[{'query': 'What is ML?', 'answer': 'Short answer...', 'passage': '...'}],
[{'query': 'Explain machine learning', 'answer': 'Long answer...', 'passage': '...'}],
[{'query': 'Define ML', 'answer': 'Brief def...', 'passage': '...'}],
]
# Generate preference pairs
dpo_data = generate_llm_dpo_train_data(
queries_corpus_list=formulation_list,
retrieval_model=model,
search_dtype='answer', # Compare answer embeddings
result_dtype='passage',
threshold=0.95,
use_rule1=True # Require improvement over raw query
)
# Output format:
# {
# 'prompt': 'What is ML?',
# 'chosen': 'Long explanatory answer...', # Best formulation
# 'rejected': 'Brief def...', # Worst formulation
# 'chosen_score': 0.92,
# 'rejected_score': 0.78
# }
Evaluate Retrieval Performance
import numpy as np
from research.Reinforced_IR.data_generation.utils import evaluate
# Ground truth relevance
qrels = {
'0': {'0': 1, '5': 1}, # query 0 has docs 0,5 as relevant
'1': {'2': 1, '7': 1, '9': 1}
}
# Retrieval results (top-10 doc indices per query)
predictions = np.array([
[0, 3, 5, 1, 8, 4, 2, 6, 7, 9], # query 0 results
[2, 7, 1, 9, 0, 5, 3, 4, 6, 8] # query 1 results
])
scores = np.array([
[0.95, 0.89, 0.87, 0.81, 0.76, 0.72, 0.68, 0.61, 0.55, 0.50],
[0.93, 0.91, 0.84, 0.82, 0.78, 0.74, 0.69, 0.63, 0.58, 0.52]
])
# Compute metrics
metrics = evaluate(
metrics=['recall', 'mrr', 'ndcg', 'map'],
k_values=[1, 5, 10],
ground_truths=qrels,
predicts=predictions,
scores=scores
)
print(metrics)
# {
# 'recall': {'Recall@1': 0.5, 'Recall@5': 1.0, 'Recall@10': 1.0},
# 'mrr': {'MRR@1': 0.5, 'MRR@5': 0.75, 'MRR@10': 0.75},
# 'ndcg': {'NDCG@1': 0.5, 'NDCG@5': 0.89, 'NDCG@10': 0.91},
# ...
# }
FAISS Dense Retrieval
from research.Reinforced_IR.data_generation.utils import search
# Query and document embeddings (numpy arrays)
query_embeddings = model.encode_queries(queries) # (N_q, dim)
doc_embeddings = model.encode_corpus(corpus) # (N_d, dim)
# Search top-100 per query using multi-GPU FAISS
scores, indices = search(
queries_emb=query_embeddings,
doc_emb=doc_embeddings,
topk=100
)
# scores: (N_q, 100) - similarity scores
# indices: (N_q, 100) - document indices
# Get top docs for first query
top_docs = [corpus[i] for i in indices[0, :5]]
print(f"Top 5 docs: {top_docs}")