Implementation:FlagOpen FlagEmbedding LLM Embedder Utils
| Knowledge Sources | |
|---|---|
| Domains | Machine Learning, Utilities, Data Processing |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Utility functions and classes for file operations, data processing, logging, and model parameter manipulation in the LLM Embedder framework.
Description
This module provides a comprehensive collection of utility functions and helper classes used throughout the LLM Embedder codebase. It includes file I/O operations (JSON, pickle), nested list manipulation, text normalization, dataset processing wrappers, custom samplers for distributed training, data collators with dynamic padding, and model parameter mixing utilities.
Key components include:
- FileLogger: Structured logging with timestamps and command tracking
- Sequential_Sampler: Distributed sequential data sampling
- DatasetProcessFn: Decorator for HuggingFace dataset processing functions
- DefaultDataCollator: Dynamic padding for various input types
- File operations: JSON/pickle save/load, directory management
- List utilities: Nested list padding, masking, and length computation
- Text processing: Normalization with configurable options
- Model utilities: Parameter mixing for model ensembling
Usage
Use this module when you need common utility functions for data processing, file management, logging, or distributed training in retrieval model workflows.
Code Reference
Source Location
- Repository: FlagOpen_FlagEmbedding
- File: research/llm_embedder/src/utils/util.py
Signature
class FileLogger:
def __init__(self, log_file)
def log(self, metrics, **kwargs)
class Sequential_Sampler:
def __init__(self, dataset_length:int, num_replicas:int, rank:int)
def __iter__(self)
def __len__(self)
class DatasetProcessFn:
def __init__(self, augment=False)
def __call__(self, _process_fn)
@dataclass
class DefaultDataCollator:
tokenizer: PreTrainedTokenizer
attention_padding_value: int = 0
label_padding_value: int = -100
add_position_ids: bool = False
def __call__(self, batch_elem: List) -> Dict[str, Any]
# Utility functions
def makedirs(path)
def split_file_dir_name_ext(path)
def save_json(obj, path:str)
def load_json(path, lines=False)
def save_pickle(obj, path:str)
def load_pickle(path)
def normalize_text(text, ignore_case=True, ignore_punctuation=True,
ignore_space=True, ignore_number=False)
def get_max_length_in_nested_lists(lst)
def pad_nested_lists(lst, max_length, padding_value, padding_side="right")
def mix_parameters(models: List[torch.nn.Module], weights: Optional[List[float]]=None)
Import
from utils.util import (FileLogger, Sequential_Sampler, DatasetProcessFn,
DefaultDataCollator, save_json, load_json,
normalize_text, pad_nested_lists, mix_parameters)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| path | str | Yes | File path for save/load operations |
| obj | Any | Yes | Object to save (for save functions) |
| lst | List | Yes | Nested list for padding/processing |
| models | List[torch.nn.Module] | Yes | Models to mix parameters |
| weights | List[float] | No | Mixing weights (defaults to uniform) |
Outputs
| Name | Type | Description |
|---|---|---|
| loaded_obj | Any | Loaded object from file |
| padded_list | List | Padded nested list with masks |
| mixed_model | torch.nn.Module | Model with mixed parameters |
| sampler_indices | Iterator | Sequential indices for distributed sampling |
Usage Examples
from utils.util import (FileLogger, Sequential_Sampler, DatasetProcessFn,
save_json, load_json, normalize_text,
pad_nested_lists, mix_parameters)
# File logging
logger = FileLogger("experiments.log")
logger.log(
metrics={"recall@10": 0.85, "mrr@10": 0.67},
model_name="llm-embedder",
dataset="msmarco"
)
# Sequential distributed sampling
sampler = Sequential_Sampler(
dataset_length=100000,
num_replicas=8, # 8 GPUs
rank=0 # Current process
)
print(f"Process 0 handles indices: {sampler.start} to {sampler.end}")
# Dataset processing with custom function
@DatasetProcessFn(augment=False)
def process_example(query:str, pos:str, neg:List[str], **kwds):
return {
"query": query,
"positive": pos,
"negative": neg[0] # Select first negative
}
dataset = dataset.map(process_example, batched=True, num_proc=8)
# Nested list padding
queries = [
["What is AI?", "How does ML work?"],
["Define NLP"] # Shorter list
]
padded_queries, masks = pad_nested_lists(
queries, max_length=2, padding_value="", padding_side="right"
)
# Result: [["What is AI?", "How does ML work?"], ["Define NLP", ""]]
# Masks: [[1, 1], [1, 0]]
# Text normalization
text = "Hello, World! 123"
normalized = normalize_text(
text,
ignore_case=True, # "hello world 123"
ignore_punctuation=True, # "hello world 123"
ignore_space=True, # proper spacing
ignore_number=False
)
# Mix model parameters (ensemble)
from transformers import AutoModel
model1 = AutoModel.from_pretrained("model_checkpoint_1")
model2 = AutoModel.from_pretrained("model_checkpoint_2")
model3 = AutoModel.from_pretrained("model_checkpoint_3")
# Weighted average
mixed_model = mix_parameters(
models=[model1, model2, model3],
weights=[0.5, 0.3, 0.2]
)
mixed_model.save_pretrained("ensemble_model")
# JSON operations
data = {"queries": ["q1", "q2"], "labels": [1, 0]}
save_json(data, "output/data.json")
loaded = load_json("output/data.json")