Implementation:EvolvingLMMs Lab Lmms eval EvalClient
| Knowledge Sources | |
|---|---|
| Domains | Server, Client |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for programmatic evaluation orchestration via synchronous and asynchronous Python clients provided by the lmms-eval framework.
Description
The EvalClient class provides a synchronous Python interface to the lmms-eval HTTP server. It wraps all server endpoints in typed methods, handles HTTP connection management via httpx.Client, and provides a wait_for_job() method that blocks until an evaluation completes or fails. The AsyncEvalClient class provides an equivalent asynchronous interface using httpx.AsyncClient and asyncio.sleep() for non-blocking polling.
Both clients strip None-valued optional parameters from evaluation requests, support configurable request timeouts, implement context manager protocols for proper resource cleanup, and share a common _process_job_status() function for interpreting terminal job states.
Usage
Use this implementation when you need to:
- Submit evaluation jobs and collect results from Python scripts or notebooks
- Build blocking workflows that wait for evaluation completion
- Integrate evaluation into async applications or event loops
- Query server health, available tasks, and model types programmatically
Code Reference
Source Location
- Repository: lmms-eval
- File:
lmms_eval/entrypoints/client.py - Lines: L61-295 (EvalClient), L298-391 (AsyncEvalClient)
Signature
class EvalClient:
"""Python client for the LMMS-Eval HTTP server."""
def __init__(
self,
base_url: str = "http://localhost:8000",
timeout: Optional[float] = None,
): ...
def health(self) -> Dict[str, Any]: ...
def is_healthy(self) -> bool: ...
def list_tasks(self) -> List[str]: ...
def list_models(self) -> List[str]: ...
def evaluate(
self,
model: str,
tasks: List[str],
model_args: Optional[Dict[str, Any]] = None,
num_fewshot: Optional[int] = None,
batch_size: Optional[Union[int, str]] = None,
device: Optional[str] = None,
limit: Optional[Union[int, float]] = None,
gen_kwargs: Optional[str] = None,
log_samples: bool = True,
predict_only: bool = False,
num_gpus: int = 1,
output_dir: Optional[str] = None,
) -> Dict[str, Any]: ...
def get_job(self, job_id: str) -> Dict[str, Any]: ...
def cancel_job(self, job_id: str) -> Dict[str, Any]: ...
def wait_for_job(
self,
job_id: str,
poll_interval: float = 5.0,
timeout: Optional[float] = None,
verbose: bool = True,
) -> Dict[str, Any]: ...
def get_queue_status(self) -> Dict[str, Any]: ...
def close(self): ...
class AsyncEvalClient:
"""Async version of the LMMS-Eval client."""
def __init__(
self,
base_url: str = "http://localhost:8000",
timeout: Optional[float] = None,
): ...
async def health(self) -> Dict[str, Any]: ...
async def evaluate(self, model: str, tasks: List[str], **kwargs) -> Dict[str, Any]: ...
async def get_job(self, job_id: str) -> Dict[str, Any]: ...
async def cancel_job(self, job_id: str) -> Dict[str, Any]: ...
async def wait_for_job(self, job_id: str, poll_interval: float = 5.0, timeout: Optional[float] = None, verbose: bool = True) -> Dict[str, Any]: ...
async def get_queue_status(self) -> Dict[str, Any]: ...
async def close(self): ...
Import
from lmms_eval.entrypoints.client import EvalClient, AsyncEvalClient
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| base_url | str |
No (default: "http://localhost:8000") | Base URL of the lmms-eval HTTP server |
| timeout | Optional[float] |
No (default: None) | HTTP request timeout in seconds; None disables timeout |
| model | str |
Yes (for evaluate) | Model name or path |
| tasks | List[str] |
Yes (for evaluate) | List of evaluation task names |
| model_args | Optional[Dict[str, Any]] |
No | Model-specific arguments |
| num_fewshot | Optional[int] |
No | Number of few-shot examples |
| batch_size | Optional[Union[int, str]] |
No | Batch size for evaluation |
| device | Optional[str] |
No | Device specification (e.g., "cuda:0") |
| limit | Optional[Union[int, float]] |
No | Limit on number of evaluation examples |
| log_samples | bool |
No (default: True) | Whether to log individual samples |
| predict_only | bool |
No (default: False) | Skip metric computation |
| num_gpus | int |
No (default: 1) | Number of GPUs |
| poll_interval | float |
No (default: 5.0) | Seconds between status polls in wait_for_job |
| verbose | bool |
No (default: True) | Print status updates during wait_for_job |
Outputs
| Method | Return Type | Description |
|---|---|---|
| evaluate() | Dict[str, Any] |
JobSubmitResponse dict with job_id, status, position_in_queue, message |
| get_job() | Dict[str, Any] |
JobInfo dict with status, timestamps, request, result, error |
| wait_for_job() | Dict[str, Any] |
JobInfo dict (blocks until terminal state); raises RuntimeError on failure, TimeoutError on timeout |
| cancel_job() | Dict[str, Any] |
Dict with confirmation message |
| get_queue_status() | Dict[str, Any] |
QueueStatusResponse dict with queue_size, running_job, queued_jobs, completed_jobs, failed_jobs |
| health() | Dict[str, Any] |
HealthResponse dict with status, timestamp, queue_size |
| list_tasks() | List[str] |
List of available evaluation task names |
| list_models() | List[str] |
List of available model type names |
| is_healthy() | bool |
True if server responds with status "healthy" |
Usage Examples
Basic Example
from lmms_eval.entrypoints.client import EvalClient
# Submit and wait for results
with EvalClient("http://localhost:8000") as client:
job = client.evaluate(
model="qwen2_5_vl",
tasks=["mmmu_val"],
model_args={"pretrained": "Qwen/Qwen2.5-VL-3B-Instruct"},
batch_size=128,
)
print(f"Job submitted: {job['job_id']}")
# Block until completion
result = client.wait_for_job(job["job_id"])
print(f"Status: {result['status']}")
print(f"Results: {result['result']}")
Async Client Example
import asyncio
from lmms_eval.entrypoints.client import AsyncEvalClient
async def run_evaluation():
async with AsyncEvalClient("http://localhost:8000") as client:
job = await client.evaluate(
model="qwen2_5_vl",
tasks=["mmmu_val", "mme"],
)
result = await client.wait_for_job(job["job_id"])
return result
result = asyncio.run(run_evaluation())
Queue Management Example
from lmms_eval.entrypoints.client import EvalClient
with EvalClient() as client:
# Check server health
if not client.is_healthy():
raise RuntimeError("Server is not healthy")
# Discover available tasks and models
tasks = client.list_tasks()
models = client.list_models()
print(f"Available tasks: {len(tasks)}, models: {len(models)}")
# Submit a job
job = client.evaluate(model="llava", tasks=["mme"], limit=10)
# Check queue status
queue = client.get_queue_status()
print(f"Queue size: {queue['queue_size']}")
# Cancel if needed
client.cancel_job(job["job_id"])
Fire-and-Forget with Timeout
from lmms_eval.entrypoints.client import EvalClient
client = EvalClient("http://localhost:8000", timeout=30.0)
try:
job = client.evaluate(
model="qwen2_5_vl",
tasks=["mmmu_val"],
model_args={"pretrained": "Qwen/Qwen2.5-VL-3B-Instruct"},
)
# Wait up to 1 hour, checking every 10 seconds
result = client.wait_for_job(
job["job_id"],
poll_interval=10.0,
timeout=3600.0,
)
except TimeoutError:
print("Evaluation timed out")
except RuntimeError as e:
print(f"Evaluation failed: {e}")
finally:
client.close()