Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:EvolvingLMMs Lab Lmms eval JobScheduler

From Leeroopedia
Knowledge Sources
Domains Server, Job_Management
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for managing evaluation job lifecycle with queue tracking, status monitoring, and cancellation provided by the lmms-eval framework.

Description

The JobScheduler class is a thread-safe job scheduler that manages evaluation jobs with queue-based sequential execution. It maintains an internal dictionary of JobInfo records, an asyncio.Queue for pending work, and a background worker task that processes jobs one at a time. All public methods that access shared state are protected by an asyncio.Lock.

The scheduler provides methods for adding jobs, querying individual job status with dynamic queue position calculation, retrieving aggregate queue statistics, cancelling queued jobs, and automatically cleaning up old terminal jobs to prevent memory leaks.

The HTTP server exposes three endpoints that delegate to the scheduler: GET /queue calls get_queue_stats(), GET /jobs/{job_id} calls get_job_with_position(), and DELETE /jobs/{job_id} calls cancel_job().

Usage

Use this implementation when you need to:

  • Monitor the status of submitted evaluation jobs
  • Query the overall queue state for dashboard or tooling integration
  • Cancel jobs that are no longer needed before they begin execution
  • Understand the internal job management architecture

Code Reference

Source Location

  • Repository: lmms-eval
  • File: lmms_eval/entrypoints/http_server.py
  • Lines: L116-157 (route handlers for /jobs/{job_id}, /queue, DELETE /jobs/{job_id})
  • File: lmms_eval/entrypoints/job_scheduler.py
  • Lines: L30-296 (JobScheduler class)

Signature

class JobScheduler:
    """Thread-safe job scheduler for managing evaluation jobs."""

    DEFAULT_MAX_COMPLETED_JOBS = 100
    DEFAULT_TEMP_DIR_PREFIX = "lmms_eval_"

    def __init__(
        self,
        max_completed_jobs: int = DEFAULT_MAX_COMPLETED_JOBS,
        temp_dir_prefix: str = DEFAULT_TEMP_DIR_PREFIX,
    ):
        ...

    async def start(self): ...
    async def stop(self): ...
    async def get_job(self, job_id: str) -> Optional[JobInfo]: ...
    async def get_job_with_position(self, job_id: str) -> Optional[JobInfo]: ...
    async def add_job(self, request: EvaluateRequest) -> tuple[str, int]: ...
    async def cancel_job(self, job_id: str) -> tuple[bool, str]: ...
    async def get_queue_stats(self) -> dict: ...
    async def cleanup_old_jobs(self) -> int: ...

Import

from lmms_eval.entrypoints.job_scheduler import JobScheduler
from lmms_eval.entrypoints.protocol import JobInfo, JobStatus, QueueStatusResponse

I/O Contract

Inputs

Name Type Required Description
job_id str Yes (for job-specific operations) UUID4 identifier returned from job submission
max_completed_jobs int No (default: 100) Maximum terminal jobs retained before cleanup
temp_dir_prefix str No (default: "lmms_eval_") Prefix for temporary output directories

Outputs

GET /queue -> QueueStatusResponse:

Name Type Description
queue_size int Number of jobs currently waiting in the queue
running_job Optional[str] Job ID of the currently executing job, or null
queued_jobs List[str] List of job IDs in QUEUED state
completed_jobs int Count of completed jobs in memory
failed_jobs int Count of failed jobs in memory

GET /jobs/{job_id} -> JobInfo:

Name Type Description
job_id str The job's UUID4 identifier
status JobStatus Current status: queued, running, completed, failed, or cancelled
created_at str ISO 8601 timestamp of job creation
started_at Optional[str] ISO 8601 timestamp when execution began
completed_at Optional[str] ISO 8601 timestamp when job reached terminal state
request EvaluateRequest The original evaluation request parameters
result Optional[Dict[str, Any]] Parsed output directory structure (completed jobs only)
error Optional[str] Error message (failed jobs only)
position_in_queue Optional[int] Dynamic queue position (queued jobs only)

DELETE /jobs/{job_id}:

Name Type Description
message str Confirmation message on successful cancellation

Usage Examples

Basic Example

import httpx

# Check queue status
response = httpx.get("http://localhost:8000/queue")
queue = response.json()
print(f"Queue size: {queue['queue_size']}")
print(f"Running job: {queue['running_job']}")
print(f"Completed: {queue['completed_jobs']}, Failed: {queue['failed_jobs']}")

# Check individual job status
job_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
response = httpx.get(f"http://localhost:8000/jobs/{job_id}")
job = response.json()
print(f"Status: {job['status']}, Created: {job['created_at']}")

Cancel a Queued Job

import httpx

job_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
response = httpx.delete(f"http://localhost:8000/jobs/{job_id}")
if response.status_code == 200:
    print(response.json()["message"])
else:
    print(f"Cancellation failed: {response.json()['detail']}")

Programmatic Scheduler Usage

from lmms_eval.entrypoints.job_scheduler import JobScheduler
from lmms_eval.entrypoints.protocol import EvaluateRequest

scheduler = JobScheduler(max_completed_jobs=50)
await scheduler.start()

# Add a job
request = EvaluateRequest(model="qwen2_5_vl", tasks=["mmmu_val"])
job_id, position = await scheduler.add_job(request)

# Get queue stats
stats = await scheduler.get_queue_stats()
print(f"Queued: {len(stats['queued'])}, Running: {stats['running_job']}")

# Get job with position
job = await scheduler.get_job_with_position(job_id)
print(f"Job {job.job_id}: {job.status}, position: {job.position_in_queue}")

await scheduler.stop()

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment