Implementation:EvolvingLMMs Lab Lmms eval TUI Server
| Knowledge Sources | |
|---|---|
| Domains | Web_UI, Evaluation, Backend |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
The TUI Server (/tmp/kapso_repo_sslb_59s/lmms_eval/tui/server.py) is a FastAPI-based backend that provides a web interface for configuring and running LMMs-Eval evaluations. It manages job execution, streams real-time output, and serves the compiled React frontend.
This 354-line implementation provides a complete REST API and job orchestration layer for the browser-based TUI.
File Location
/tmp/kapso_repo_sslb_59s/lmms_eval/tui/server.py
Key Components
Application Setup
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
app = FastAPI(title="LMMs-Eval Web UI", version="0.1.0")
STATIC_DIR = Path(__file__).parent / "web" / "dist"
# Enable CORS for local development
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory job storage
_jobs: dict[str, dict[str, Any]] = {}
The server uses:
- FastAPI for REST API framework
- CORS middleware configured for development (all origins)
- In-memory dictionary for job state tracking
- Static file mounting for serving React frontend
System Information Endpoints
def get_version() -> str:
"""Get lmms-eval version from package metadata."""
try:
return pkg_version("lmms_eval")
except Exception:
return "0.5.0"
def get_git_info() -> dict[str, str]:
try:
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"]
).decode().strip()
commit = subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"]
).decode().strip()
return {"branch": branch, "commit": commit}
except Exception:
return {"branch": "unknown", "commit": "unknown"}
def get_system_info() -> dict[str, str]:
return {
"hostname": socket.gethostname(),
"platform": platform.platform(),
"python": platform.python_version(),
"cwd": os.getcwd(),
"repo_root": get_repo_root(),
}
@app.get("/health")
async def health() -> dict[str, Any]:
return {
"status": "ok",
"version": get_version(),
"git": get_git_info(),
"system": get_system_info(),
}
The /health endpoint provides comprehensive runtime information for debugging and display in the UI.
Discovery Endpoints
@app.get("/models", response_model=list[ModelInfo])
async def get_models() -> list[ModelInfo]:
"""Get available models."""
cache = get_discovery_cache()
models = cache.get_models(include_all=True)
return [ModelInfo(id=model_id, name=name) for model_id, name in models]
@app.get("/tasks", response_model=list[TaskInfo])
async def get_tasks() -> list[TaskInfo]:
"""Get available tasks."""
cache = get_discovery_cache()
tasks = cache.get_tasks(include_all=True)
return [
TaskInfo(
id=task_id,
name=name,
group=name.startswith("[Group]"),
)
for task_id, name in tasks
]
These endpoints query the discovery cache to enumerate available models and tasks. Task groups are identified by the "[Group]" prefix in their name.
Request Models
class EvalRequest(BaseModel):
model: str
model_args: str = ""
tasks: list[str]
env_vars: str = ""
batch_size: int = 1
limit: int | None = 10
output_path: str = "./logs/"
log_samples: bool = True
verbosity: str = "INFO"
device: str | None = None
class PreviewRequest(BaseModel):
model: str
model_args: str = ""
tasks: list[str]
env_vars: str = ""
batch_size: int = 1
limit: int | None = 10
output_path: str = "./logs/"
log_samples: bool = True
verbosity: str = "INFO"
device: str | None = None
Pydantic models provide type validation and automatic API documentation. EvalRequest and PreviewRequest share the same fields.
Environment Variable Processing
def _normalize_env_line(line: str) -> str | None:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
return None
if stripped.startswith("export "):
return stripped
if "=" in stripped:
return f"export {stripped}"
return None
def _build_env_exports(env_vars: str) -> list[str]:
exports: list[str] = []
for line in env_vars.splitlines():
export_line = _normalize_env_line(line)
if export_line:
exports.append(export_line)
return exports
Environment variable normalization:
- Strips comments and empty lines
- Adds "export" prefix to assignments without it
- Preserves existing export statements
- Returns list of valid export commands
Command Construction
def _build_command(request: EvalRequest | PreviewRequest) -> str:
"""Build the lmms_eval command string."""
parts = ["python -m lmms_eval"]
parts.append(f"--model {request.model}")
if request.model_args:
parts.append(f"--model_args '{request.model_args}'")
if request.tasks:
parts.append(f"--tasks {','.join(request.tasks)}")
parts.append(f"--batch_size {request.batch_size}")
if request.limit is not None:
parts.append(f"--limit {request.limit}")
parts.append(f"--output_path {request.output_path}")
if request.log_samples:
parts.append("--log_samples")
parts.append(f"--verbosity {request.verbosity}")
if request.device:
parts.append(f"--device {request.device}")
command = " \\\n ".join(parts)
env_exports = _build_env_exports(request.env_vars)
if env_exports:
return "\n".join([*env_exports, command])
return command
This builds a human-readable command with line breaks for display in the UI.
def _build_shell_command(request: EvalRequest) -> str:
"""Build the shell command for execution."""
parts = ["python", "-m", "lmms_eval"]
parts.extend(["--model", request.model])
if request.model_args:
parts.extend(["--model_args", request.model_args])
if request.tasks:
parts.extend(["--tasks", ",".join(request.tasks)])
parts.extend(["--batch_size", str(request.batch_size)])
if request.limit is not None:
parts.extend(["--limit", str(request.limit)])
parts.extend(["--output_path", request.output_path])
if request.log_samples:
parts.append("--log_samples")
parts.extend(["--verbosity", request.verbosity])
if request.device:
parts.extend(["--device", request.device])
command = " ".join(parts)
env_exports = _build_env_exports(request.env_vars)
if env_exports:
export_prefix = " && ".join(env_exports)
return f"{export_prefix} && {command}"
return command
This builds a shell-compatible command for execution, chaining exports with &&.
Command Preview Endpoint
@app.post("/eval/preview", response_model=PreviewResponse)
async def preview_command(request: PreviewRequest) -> PreviewResponse:
"""Generate command preview without executing."""
command = _build_command(request)
return PreviewResponse(command=command)
The preview endpoint generates the display command without execution, enabling real-time preview in the UI.
Job Lifecycle
@app.post("/eval/start", response_model=EvalStartResponse)
async def start_eval(request: EvalRequest) -> EvalStartResponse:
"""Start an evaluation job."""
if not request.tasks:
raise HTTPException(status_code=400, detail="No tasks specified")
job_id = str(uuid.uuid4())
command = _build_command(request)
shell_command = _build_shell_command(request)
_jobs[job_id] = {
"status": "starting",
"command": shell_command,
"process": None,
"request": request,
}
return EvalStartResponse(job_id=job_id, command=command)
Starting a job:
1. Validates that tasks are specified
2. Generates unique job ID (UUID)
3. Builds both display and execution commands
4. Stores job metadata in _jobs dictionary
5. Returns job ID and display command to client
Output Streaming
async def _stream_output(job_id: str):
"""Stream subprocess output as SSE events."""
job = _jobs.get(job_id)
if not job:
yield f"data: {json.dumps({'type': 'error', 'message': 'Job not found'})}\n\n"
return
shell_command = job["command"]
try:
process = await asyncio.create_subprocess_shell(
shell_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
job["process"] = process
job["status"] = "running"
if process.stdout:
async for line in process.stdout:
if job.get("stopped"):
break
decoded = line.decode("utf-8", errors="replace").rstrip()
yield f"data: {json.dumps({'type': 'output', 'line': decoded})}\n\n"
await process.wait()
exit_code = process.returncode
if job.get("stopped"):
yield f"data: {json.dumps({'type': 'stopped'})}\n\n"
else:
yield f"data: {json.dumps({'type': 'done', 'exit_code': exit_code})}\n\n"
job["status"] = "completed"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
job["status"] = "error"
finally:
job["process"] = None
Output streaming implementation:
- Creates subprocess with
start_new_session=Truefor process group - Merges stderr into stdout for unified output
- Yields SSE-formatted events:
data: {...}\n\n - Event types: output (log line), done (completion), stopped (user termination), error (failure)
- Checks
job["stopped"]flag to allow graceful shutdown - Cleans up process reference on completion
@app.get("/eval/{job_id}/stream")
async def stream_eval(job_id: str):
"""Stream evaluation output via SSE."""
if job_id not in _jobs:
raise HTTPException(status_code=404, detail="Job not found")
return StreamingResponse(
_stream_output(job_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
The stream endpoint returns an SSE response with appropriate headers to prevent buffering and caching.
Job Termination
@app.post("/eval/{job_id}/stop")
async def stop_eval(job_id: str) -> dict[str, str]:
"""Stop a running evaluation job."""
job = _jobs.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
job["stopped"] = True
process = job.get("process")
if process:
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
except (ProcessLookupError, OSError):
try:
process.terminate()
except Exception:
pass
return {"status": "stopped"}
Job termination strategy:
1. Sets stopped flag to signal streaming loop
2. Attempts process group termination (killpg) to kill child processes
3. Falls back to direct process termination if group termination fails
4. Ignores exceptions if process already terminated
Static File Serving
if STATIC_DIR.exists():
app.mount("/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="assets")
@app.get("/")
async def serve_index():
return FileResponse(STATIC_DIR / "index.html")
@app.get("/{path:path}")
async def serve_spa(path: str):
file_path = STATIC_DIR / path
if file_path.exists() and file_path.is_file():
return FileResponse(file_path)
return FileResponse(STATIC_DIR / "index.html")
Static file serving:
- Mounts
/assetsdirectory for bundled resources - Serves
index.htmlat root - Implements SPA fallback: non-existent paths return
index.htmlfor client-side routing - Only activates if dist directory exists (after frontend build)
Usage Patterns
Starting the Server
The server is typically started via:
uvicorn lmms_eval.tui.server:app --host 0.0.0.0 --port 8000
Or using the TUI entry point which wraps uvicorn.
API Flow
1. Client fetches /health, /models, /tasks on load
2. User configures evaluation in UI
3. Client calls /eval/preview to show command (real-time)
4. User clicks Start
5. Client calls /eval/start, receives job ID
6. Client opens EventSource to /eval/{job_id}/stream
7. Server streams output events until completion
8. (Optional) Client calls /eval/{job_id}/stop to terminate
Job State Transitions
starting → running → completed (success)
→ error (failure)
→ stopped (user termination)
Security Considerations
CORS Configuration: Currently allows all origins (allow_origins=["*"]). This is suitable for local development but should be restricted in production.
No Authentication: The API has no authentication layer. It is designed for local/trusted environments where the user has filesystem access.
Shell Command Execution: Uses subprocess.create_subprocess_shell which executes arbitrary commands. The server trusts the configuration provided by the client.
Process Group Termination: Uses SIGTERM for graceful shutdown rather than SIGKILL, allowing subprocesses to clean up.
Related Principles
- Principle:EvolvingLMMs_Lab_Lmms_eval_TUI_Server_Architecture
- Principle:EvolvingLMMs_Lab_Lmms_eval_Task_Selection
- Principle:EvolvingLMMs_Lab_Lmms_eval_Model_Type_Selection
Related Implementations
- Implementation:EvolvingLMMs_Lab_Lmms_eval_TUI_App_Component - React frontend that consumes this API
- Implementation:EvolvingLMMs_Lab_Lmms_eval_TUI_Discovery - Discovery cache integration