Implementation:EvolvingLMMs Lab Lmms eval Distributed Dispatch
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Computing, Model_Inference |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for executing model inference in parallel across ranks with synchronization barriers provided by the lmms-eval framework.
Description
The inference dispatch logic in lmms_eval/evaluator.py (L568-605) iterates over each request type, builds the padded request list, dispatches the requests to the model, and synchronizes all ranks upon completion.
The dispatch uses Python's getattr to dynamically call the appropriate model method based on the request type string. For example, getattr(lm, "generate_until")(cloned_reqs) calls the model's generate_until method. This design allows the evaluator to be model-agnostic -- any model implementing the expected interface methods can be evaluated.
The request list is first expanded by repeats (each request's repeats attribute determines how many times it should be evaluated, useful for measuring variance), then extended with padding copies. The model processes the entire expanded list in one call, handling batching internally.
After inference for each request type completes, a barrier synchronization ensures all ranks have finished. This is followed by a second barrier after any post-inference operations (such as launching an evaluation server for judge-based metrics), ensuring all ranks are aligned before the result gathering phase begins.
Usage
This dispatch is executed automatically during evaluation. Users control parallel inference through the launch command and model configuration:
# 4-GPU evaluation with accelerate
accelerate launch --num_processes=4 -m lmms_eval \
--model qwen2_5_vl \
--model_args pretrained=Qwen/Qwen2.5-VL-3B-Instruct \
--tasks mmmu \
--batch_size 32
Code Reference
Source Location
- Repository: lmms-eval
- File:
lmms_eval/evaluator.py - Lines: L568-584 (dispatch), L586-592 (post-inference barrier), L601-605 (post-launcher barrier)
Signature
# Inference dispatch loop
for reqtype, reqs in requests.items():
# Expand by repeats
cloned_reqs = []
for req in reqs:
cloned_reqs.extend([req] * req.repeats)
# Apply padding
if (world_size > 1) and (padding_requests[reqtype] > 0):
for _ in range(padding_requests[reqtype]):
cloned_reqs.extend([req] * req.repeats)
# Dispatch to model
resps = getattr(lm, reqtype)(cloned_reqs)
# Store responses
for x, req in zip(resps, cloned_reqs):
req.resps.append(x)
# Post-inference synchronization
if world_size > 1:
if distributed_executor_backend == "accelerate":
lm.accelerator.wait_for_everyone()
elif distributed_executor_backend == "torchrun":
dist.barrier()
Import
import torch.distributed as dist
from accelerate import Accelerator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| requests | defaultdict(list) |
Yes | Dictionary mapping request types (e.g., "generate_until", "loglikelihood") to lists of Instance objects
|
| padding_requests | defaultdict(int) |
Yes | Dictionary mapping request types to the number of padding instances to append on this rank |
| lm | lmM (model object) |
Yes | The model object implementing generate_until(), loglikelihood(), and/or generate_until_multi_round()
|
| world_size | int |
Yes | Total number of distributed processes |
| distributed_executor_backend | str |
Yes | Either "accelerate" or "torchrun"
|
Outputs
| Name | Type | Description |
|---|---|---|
| req.resps | list (per Instance) |
Each Instance's resps list is appended with the model's response for that request
|
| synchronization | side effect | All ranks are guaranteed to have completed inference for each request type before proceeding |
Usage Examples
Basic Example
from collections import defaultdict
# Simulated request dispatch (simplified)
requests = defaultdict(list)
# ... requests populated from task instances ...
padding_requests = defaultdict(int)
# ... padding computed from all_gather ...
for reqtype, reqs in requests.items():
# Build padded request list
cloned_reqs = []
for req in reqs:
cloned_reqs.extend([req] * req.repeats)
if world_size > 1 and padding_requests[reqtype] > 0:
last_req = cloned_reqs[-1]
for _ in range(padding_requests[reqtype]):
cloned_reqs.append(last_req)
# Dispatch -- each rank calls this independently on its own data
resps = getattr(lm, reqtype)(cloned_reqs)
# Store results (only non-padded results matter)
for x, req in zip(resps, cloned_reqs):
req.resps.append(x)
# Synchronize before moving to next request type
if world_size > 1:
dist.barrier()