Implementation:EvolvingLMMs Lab Lmms eval JobScheduler
| Knowledge Sources | |
|---|---|
| Domains | Server, Job_Management |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for managing evaluation job lifecycle with queue tracking, status monitoring, and cancellation provided by the lmms-eval framework.
Description
The JobScheduler class is a thread-safe job scheduler that manages evaluation jobs with queue-based sequential execution. It maintains an internal dictionary of JobInfo records, an asyncio.Queue for pending work, and a background worker task that processes jobs one at a time. All public methods that access shared state are protected by an asyncio.Lock.
The scheduler provides methods for adding jobs, querying individual job status with dynamic queue position calculation, retrieving aggregate queue statistics, cancelling queued jobs, and automatically cleaning up old terminal jobs to prevent memory leaks.
The HTTP server exposes three endpoints that delegate to the scheduler: GET /queue calls get_queue_stats(), GET /jobs/{job_id} calls get_job_with_position(), and DELETE /jobs/{job_id} calls cancel_job().
Usage
Use this implementation when you need to:
- Monitor the status of submitted evaluation jobs
- Query the overall queue state for dashboard or tooling integration
- Cancel jobs that are no longer needed before they begin execution
- Understand the internal job management architecture
Code Reference
Source Location
- Repository: lmms-eval
- File:
lmms_eval/entrypoints/http_server.py - Lines: L116-157 (route handlers for /jobs/{job_id}, /queue, DELETE /jobs/{job_id})
- File:
lmms_eval/entrypoints/job_scheduler.py - Lines: L30-296 (JobScheduler class)
Signature
class JobScheduler:
"""Thread-safe job scheduler for managing evaluation jobs."""
DEFAULT_MAX_COMPLETED_JOBS = 100
DEFAULT_TEMP_DIR_PREFIX = "lmms_eval_"
def __init__(
self,
max_completed_jobs: int = DEFAULT_MAX_COMPLETED_JOBS,
temp_dir_prefix: str = DEFAULT_TEMP_DIR_PREFIX,
):
...
async def start(self): ...
async def stop(self): ...
async def get_job(self, job_id: str) -> Optional[JobInfo]: ...
async def get_job_with_position(self, job_id: str) -> Optional[JobInfo]: ...
async def add_job(self, request: EvaluateRequest) -> tuple[str, int]: ...
async def cancel_job(self, job_id: str) -> tuple[bool, str]: ...
async def get_queue_stats(self) -> dict: ...
async def cleanup_old_jobs(self) -> int: ...
Import
from lmms_eval.entrypoints.job_scheduler import JobScheduler
from lmms_eval.entrypoints.protocol import JobInfo, JobStatus, QueueStatusResponse
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job_id | str |
Yes (for job-specific operations) | UUID4 identifier returned from job submission |
| max_completed_jobs | int |
No (default: 100) | Maximum terminal jobs retained before cleanup |
| temp_dir_prefix | str |
No (default: "lmms_eval_") | Prefix for temporary output directories |
Outputs
GET /queue -> QueueStatusResponse:
| Name | Type | Description |
|---|---|---|
| queue_size | int |
Number of jobs currently waiting in the queue |
| running_job | Optional[str] |
Job ID of the currently executing job, or null |
| queued_jobs | List[str] |
List of job IDs in QUEUED state |
| completed_jobs | int |
Count of completed jobs in memory |
| failed_jobs | int |
Count of failed jobs in memory |
GET /jobs/{job_id} -> JobInfo:
| Name | Type | Description |
|---|---|---|
| job_id | str |
The job's UUID4 identifier |
| status | JobStatus |
Current status: queued, running, completed, failed, or cancelled |
| created_at | str |
ISO 8601 timestamp of job creation |
| started_at | Optional[str] |
ISO 8601 timestamp when execution began |
| completed_at | Optional[str] |
ISO 8601 timestamp when job reached terminal state |
| request | EvaluateRequest |
The original evaluation request parameters |
| result | Optional[Dict[str, Any]] |
Parsed output directory structure (completed jobs only) |
| error | Optional[str] |
Error message (failed jobs only) |
| position_in_queue | Optional[int] |
Dynamic queue position (queued jobs only) |
DELETE /jobs/{job_id}:
| Name | Type | Description |
|---|---|---|
| message | str |
Confirmation message on successful cancellation |
Usage Examples
Basic Example
import httpx
# Check queue status
response = httpx.get("http://localhost:8000/queue")
queue = response.json()
print(f"Queue size: {queue['queue_size']}")
print(f"Running job: {queue['running_job']}")
print(f"Completed: {queue['completed_jobs']}, Failed: {queue['failed_jobs']}")
# Check individual job status
job_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
response = httpx.get(f"http://localhost:8000/jobs/{job_id}")
job = response.json()
print(f"Status: {job['status']}, Created: {job['created_at']}")
Cancel a Queued Job
import httpx
job_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
response = httpx.delete(f"http://localhost:8000/jobs/{job_id}")
if response.status_code == 200:
print(response.json()["message"])
else:
print(f"Cancellation failed: {response.json()['detail']}")
Programmatic Scheduler Usage
from lmms_eval.entrypoints.job_scheduler import JobScheduler
from lmms_eval.entrypoints.protocol import EvaluateRequest
scheduler = JobScheduler(max_completed_jobs=50)
await scheduler.start()
# Add a job
request = EvaluateRequest(model="qwen2_5_vl", tasks=["mmmu_val"])
job_id, position = await scheduler.add_job(request)
# Get queue stats
stats = await scheduler.get_queue_stats()
print(f"Queued: {len(stats['queued'])}, Running: {stats['running_job']}")
# Get job with position
job = await scheduler.get_job_with_position(job_id)
print(f"Job {job.job_id}: {job.status}, position: {job.position_in_queue}")
await scheduler.stop()