Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:EvolvingLMMs Lab Lmms eval EvalClient

From Leeroopedia
Knowledge Sources
Domains Server, Client
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for programmatic evaluation orchestration via synchronous and asynchronous Python clients provided by the lmms-eval framework.

Description

The EvalClient class provides a synchronous Python interface to the lmms-eval HTTP server. It wraps all server endpoints in typed methods, handles HTTP connection management via httpx.Client, and provides a wait_for_job() method that blocks until an evaluation completes or fails. The AsyncEvalClient class provides an equivalent asynchronous interface using httpx.AsyncClient and asyncio.sleep() for non-blocking polling.

Both clients strip None-valued optional parameters from evaluation requests, support configurable request timeouts, implement context manager protocols for proper resource cleanup, and share a common _process_job_status() function for interpreting terminal job states.

Usage

Use this implementation when you need to:

  • Submit evaluation jobs and collect results from Python scripts or notebooks
  • Build blocking workflows that wait for evaluation completion
  • Integrate evaluation into async applications or event loops
  • Query server health, available tasks, and model types programmatically

Code Reference

Source Location

  • Repository: lmms-eval
  • File: lmms_eval/entrypoints/client.py
  • Lines: L61-295 (EvalClient), L298-391 (AsyncEvalClient)

Signature

class EvalClient:
    """Python client for the LMMS-Eval HTTP server."""

    def __init__(
        self,
        base_url: str = "http://localhost:8000",
        timeout: Optional[float] = None,
    ): ...

    def health(self) -> Dict[str, Any]: ...
    def is_healthy(self) -> bool: ...
    def list_tasks(self) -> List[str]: ...
    def list_models(self) -> List[str]: ...

    def evaluate(
        self,
        model: str,
        tasks: List[str],
        model_args: Optional[Dict[str, Any]] = None,
        num_fewshot: Optional[int] = None,
        batch_size: Optional[Union[int, str]] = None,
        device: Optional[str] = None,
        limit: Optional[Union[int, float]] = None,
        gen_kwargs: Optional[str] = None,
        log_samples: bool = True,
        predict_only: bool = False,
        num_gpus: int = 1,
        output_dir: Optional[str] = None,
    ) -> Dict[str, Any]: ...

    def get_job(self, job_id: str) -> Dict[str, Any]: ...
    def cancel_job(self, job_id: str) -> Dict[str, Any]: ...

    def wait_for_job(
        self,
        job_id: str,
        poll_interval: float = 5.0,
        timeout: Optional[float] = None,
        verbose: bool = True,
    ) -> Dict[str, Any]: ...

    def get_queue_status(self) -> Dict[str, Any]: ...
    def close(self): ...


class AsyncEvalClient:
    """Async version of the LMMS-Eval client."""

    def __init__(
        self,
        base_url: str = "http://localhost:8000",
        timeout: Optional[float] = None,
    ): ...

    async def health(self) -> Dict[str, Any]: ...
    async def evaluate(self, model: str, tasks: List[str], **kwargs) -> Dict[str, Any]: ...
    async def get_job(self, job_id: str) -> Dict[str, Any]: ...
    async def cancel_job(self, job_id: str) -> Dict[str, Any]: ...
    async def wait_for_job(self, job_id: str, poll_interval: float = 5.0, timeout: Optional[float] = None, verbose: bool = True) -> Dict[str, Any]: ...
    async def get_queue_status(self) -> Dict[str, Any]: ...
    async def close(self): ...

Import

from lmms_eval.entrypoints.client import EvalClient, AsyncEvalClient

I/O Contract

Inputs

Name Type Required Description
base_url str No (default: "http://localhost:8000") Base URL of the lmms-eval HTTP server
timeout Optional[float] No (default: None) HTTP request timeout in seconds; None disables timeout
model str Yes (for evaluate) Model name or path
tasks List[str] Yes (for evaluate) List of evaluation task names
model_args Optional[Dict[str, Any]] No Model-specific arguments
num_fewshot Optional[int] No Number of few-shot examples
batch_size Optional[Union[int, str]] No Batch size for evaluation
device Optional[str] No Device specification (e.g., "cuda:0")
limit Optional[Union[int, float]] No Limit on number of evaluation examples
log_samples bool No (default: True) Whether to log individual samples
predict_only bool No (default: False) Skip metric computation
num_gpus int No (default: 1) Number of GPUs
poll_interval float No (default: 5.0) Seconds between status polls in wait_for_job
verbose bool No (default: True) Print status updates during wait_for_job

Outputs

Method Return Type Description
evaluate() Dict[str, Any] JobSubmitResponse dict with job_id, status, position_in_queue, message
get_job() Dict[str, Any] JobInfo dict with status, timestamps, request, result, error
wait_for_job() Dict[str, Any] JobInfo dict (blocks until terminal state); raises RuntimeError on failure, TimeoutError on timeout
cancel_job() Dict[str, Any] Dict with confirmation message
get_queue_status() Dict[str, Any] QueueStatusResponse dict with queue_size, running_job, queued_jobs, completed_jobs, failed_jobs
health() Dict[str, Any] HealthResponse dict with status, timestamp, queue_size
list_tasks() List[str] List of available evaluation task names
list_models() List[str] List of available model type names
is_healthy() bool True if server responds with status "healthy"

Usage Examples

Basic Example

from lmms_eval.entrypoints.client import EvalClient

# Submit and wait for results
with EvalClient("http://localhost:8000") as client:
    job = client.evaluate(
        model="qwen2_5_vl",
        tasks=["mmmu_val"],
        model_args={"pretrained": "Qwen/Qwen2.5-VL-3B-Instruct"},
        batch_size=128,
    )
    print(f"Job submitted: {job['job_id']}")

    # Block until completion
    result = client.wait_for_job(job["job_id"])
    print(f"Status: {result['status']}")
    print(f"Results: {result['result']}")

Async Client Example

import asyncio
from lmms_eval.entrypoints.client import AsyncEvalClient

async def run_evaluation():
    async with AsyncEvalClient("http://localhost:8000") as client:
        job = await client.evaluate(
            model="qwen2_5_vl",
            tasks=["mmmu_val", "mme"],
        )
        result = await client.wait_for_job(job["job_id"])
        return result

result = asyncio.run(run_evaluation())

Queue Management Example

from lmms_eval.entrypoints.client import EvalClient

with EvalClient() as client:
    # Check server health
    if not client.is_healthy():
        raise RuntimeError("Server is not healthy")

    # Discover available tasks and models
    tasks = client.list_tasks()
    models = client.list_models()
    print(f"Available tasks: {len(tasks)}, models: {len(models)}")

    # Submit a job
    job = client.evaluate(model="llava", tasks=["mme"], limit=10)

    # Check queue status
    queue = client.get_queue_status()
    print(f"Queue size: {queue['queue_size']}")

    # Cancel if needed
    client.cancel_job(job["job_id"])

Fire-and-Forget with Timeout

from lmms_eval.entrypoints.client import EvalClient

client = EvalClient("http://localhost:8000", timeout=30.0)
try:
    job = client.evaluate(
        model="qwen2_5_vl",
        tasks=["mmmu_val"],
        model_args={"pretrained": "Qwen/Qwen2.5-VL-3B-Instruct"},
    )
    # Wait up to 1 hour, checking every 10 seconds
    result = client.wait_for_job(
        job["job_id"],
        poll_interval=10.0,
        timeout=3600.0,
    )
except TimeoutError:
    print("Evaluation timed out")
except RuntimeError as e:
    print(f"Evaluation failed: {e}")
finally:
    client.close()

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment