Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:FlagOpen FlagEmbedding LLM Embedder Utils

From Leeroopedia


Knowledge Sources
Domains Machine Learning, Utilities, Data Processing
Last Updated 2026-02-09 00:00 GMT

Overview

Utility functions and classes for file operations, data processing, logging, and model parameter manipulation in the LLM Embedder framework.

Description

This module provides a comprehensive collection of utility functions and helper classes used throughout the LLM Embedder codebase. It includes file I/O operations (JSON, pickle), nested list manipulation, text normalization, dataset processing wrappers, custom samplers for distributed training, data collators with dynamic padding, and model parameter mixing utilities.

Key components include:

  • FileLogger: Structured logging with timestamps and command tracking
  • Sequential_Sampler: Distributed sequential data sampling
  • DatasetProcessFn: Decorator for HuggingFace dataset processing functions
  • DefaultDataCollator: Dynamic padding for various input types
  • File operations: JSON/pickle save/load, directory management
  • List utilities: Nested list padding, masking, and length computation
  • Text processing: Normalization with configurable options
  • Model utilities: Parameter mixing for model ensembling

Usage

Use this module when you need common utility functions for data processing, file management, logging, or distributed training in retrieval model workflows.

Code Reference

Source Location

Signature

class FileLogger:
    def __init__(self, log_file)
    def log(self, metrics, **kwargs)

class Sequential_Sampler:
    def __init__(self, dataset_length:int, num_replicas:int, rank:int)
    def __iter__(self)
    def __len__(self)

class DatasetProcessFn:
    def __init__(self, augment=False)
    def __call__(self, _process_fn)

@dataclass
class DefaultDataCollator:
    tokenizer: PreTrainedTokenizer
    attention_padding_value: int = 0
    label_padding_value: int = -100
    add_position_ids: bool = False
    def __call__(self, batch_elem: List) -> Dict[str, Any]

# Utility functions
def makedirs(path)
def split_file_dir_name_ext(path)
def save_json(obj, path:str)
def load_json(path, lines=False)
def save_pickle(obj, path:str)
def load_pickle(path)
def normalize_text(text, ignore_case=True, ignore_punctuation=True,
                   ignore_space=True, ignore_number=False)
def get_max_length_in_nested_lists(lst)
def pad_nested_lists(lst, max_length, padding_value, padding_side="right")
def mix_parameters(models: List[torch.nn.Module], weights: Optional[List[float]]=None)

Import

from utils.util import (FileLogger, Sequential_Sampler, DatasetProcessFn,
                        DefaultDataCollator, save_json, load_json,
                        normalize_text, pad_nested_lists, mix_parameters)

I/O Contract

Inputs

Name Type Required Description
path str Yes File path for save/load operations
obj Any Yes Object to save (for save functions)
lst List Yes Nested list for padding/processing
models List[torch.nn.Module] Yes Models to mix parameters
weights List[float] No Mixing weights (defaults to uniform)

Outputs

Name Type Description
loaded_obj Any Loaded object from file
padded_list List Padded nested list with masks
mixed_model torch.nn.Module Model with mixed parameters
sampler_indices Iterator Sequential indices for distributed sampling

Usage Examples

from utils.util import (FileLogger, Sequential_Sampler, DatasetProcessFn,
                        save_json, load_json, normalize_text,
                        pad_nested_lists, mix_parameters)

# File logging
logger = FileLogger("experiments.log")
logger.log(
    metrics={"recall@10": 0.85, "mrr@10": 0.67},
    model_name="llm-embedder",
    dataset="msmarco"
)

# Sequential distributed sampling
sampler = Sequential_Sampler(
    dataset_length=100000,
    num_replicas=8,  # 8 GPUs
    rank=0           # Current process
)
print(f"Process 0 handles indices: {sampler.start} to {sampler.end}")

# Dataset processing with custom function
@DatasetProcessFn(augment=False)
def process_example(query:str, pos:str, neg:List[str], **kwds):
    return {
        "query": query,
        "positive": pos,
        "negative": neg[0]  # Select first negative
    }

dataset = dataset.map(process_example, batched=True, num_proc=8)

# Nested list padding
queries = [
    ["What is AI?", "How does ML work?"],
    ["Define NLP"]  # Shorter list
]
padded_queries, masks = pad_nested_lists(
    queries, max_length=2, padding_value="", padding_side="right"
)
# Result: [["What is AI?", "How does ML work?"], ["Define NLP", ""]]
# Masks: [[1, 1], [1, 0]]

# Text normalization
text = "Hello, World! 123"
normalized = normalize_text(
    text,
    ignore_case=True,      # "hello world 123"
    ignore_punctuation=True,  # "hello world 123"
    ignore_space=True,     # proper spacing
    ignore_number=False
)

# Mix model parameters (ensemble)
from transformers import AutoModel

model1 = AutoModel.from_pretrained("model_checkpoint_1")
model2 = AutoModel.from_pretrained("model_checkpoint_2")
model3 = AutoModel.from_pretrained("model_checkpoint_3")

# Weighted average
mixed_model = mix_parameters(
    models=[model1, model2, model3],
    weights=[0.5, 0.3, 0.2]
)
mixed_model.save_pretrained("ensemble_model")

# JSON operations
data = {"queries": ["q1", "q2"], "labels": [1, 0]}
save_json(data, "output/data.json")
loaded = load_json("output/data.json")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment