Implementation:TobikoData Sqlmesh API Commands
| Knowledge Sources | |
|---|---|
| Domains | Web_Server, REST_API |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
Concrete tool for executing SQLMesh commands (apply, evaluate, render, test) provided by the SQLMesh web server.
Description
This module exposes FastAPI endpoints for core SQLMesh operations including plan application, model evaluation, query rendering, and test execution. The `/apply` endpoint initiates asynchronous plan application using a circuit breaker pattern for cancellation. The `/evaluate` and `/fetchdf` endpoints return Apache Arrow streaming responses for efficient data transfer. The `/render` endpoint provides SQL query rendering with optional expansion of referenced models. The `/test` endpoint runs model tests and returns structured test results with errors, failures, and successes.
Usage
These endpoints are called by the SQLMesh web UI client when users execute commands. The `/apply` endpoint is invoked when applying plans to environments. The `/evaluate` endpoint is called to preview model output with time range parameters. The `/render` endpoint is used to view formatted SQL queries with dialect conversion. The `/test` endpoint runs when users execute test suites from the UI.
Code Reference
Source Location
- Repository: TobikoData_Sqlmesh
- File: web/server/api/endpoints/commands.py
Signature
@router.post("/apply", response_model=t.Optional[models.PlanApplyStageTracker])
async def initiate_apply(
request: Request,
response: Response,
context: Context = Depends(get_loaded_context),
environment: t.Optional[str] = Body(None),
plan_dates: t.Optional[models.PlanDates] = None,
plan_options: t.Optional[models.PlanOptions] = None,
categories: t.Optional[t.Dict[str, SnapshotChangeCategory]] = None,
) -> t.Optional[models.PlanApplyStageTracker]
@router.post("/evaluate")
async def evaluate(
options: models.EvaluateInput,
context: Context = Depends(get_loaded_context),
) -> ArrowStreamingResponse
@router.post("/fetchdf")
async def fetchdf(
options: models.FetchdfInput,
context: Context = Depends(get_loaded_context),
) -> ArrowStreamingResponse
@router.post("/render", response_model=models.Query)
async def render(
options: models.RenderInput,
context: Context = Depends(get_loaded_context),
) -> models.Query
@router.get("/test")
async def test(
test: t.Optional[str] = None,
verbosity: Verbosity = Verbosity.DEFAULT,
context: Context = Depends(get_loaded_context),
) -> models.TestResult
Import
from web.server.api.endpoints.commands import router
I/O Contract
Inputs
| Endpoint | Method | Parameters | Description |
|---|---|---|---|
| /api/commands/apply | POST | environment, plan_dates, plan_options, categories | Initiates asynchronous plan application with circuit breaker support |
| /api/commands/evaluate | POST | model, start, end, execution_time, limit | Evaluates a model and returns results as Arrow stream |
| /api/commands/fetchdf | POST | sql | Executes SQL query and returns DataFrame as Arrow stream |
| /api/commands/render | POST | model, start, end, execution_time, expand, pretty, dialect | Renders model query with optional expansion and formatting |
| /api/commands/test | GET | test (optional), verbosity | Runs one or all model tests with specified verbosity |
Outputs
| Endpoint | Response Type | Description |
|---|---|---|
| /api/commands/apply | PlanApplyStageTracker | Returns None with 204 status; tracks progress via events |
| /api/commands/evaluate | ArrowStreamingResponse | Pandas/PySpark DataFrame converted to Arrow format |
| /api/commands/fetchdf | ArrowStreamingResponse | Query results as Arrow-encoded DataFrame |
| /api/commands/render | Query | Rendered SQL query with dialect conversion |
| /api/commands/test | TestResult | Test results with errors, failures, skipped, and successes |
Usage Examples
# Apply a plan to an environment
import httpx
response = httpx.post(
"http://localhost:8000/api/commands/apply",
json={
"environment": "dev",
"plan_options": {"skip_tests": False, "skip_backfill": False}
}
)
# Evaluate a model with time range
response = httpx.post(
"http://localhost:8000/api/commands/evaluate",
json={
"model": "my_schema.my_model",
"start": "2024-01-01",
"end": "2024-01-31",
"limit": 1000
}
)
# Render a model's query
response = httpx.post(
"http://localhost:8000/api/commands/render",
json={
"model": "my_schema.my_model",
"expand": True,
"pretty": True,
"dialect": "snowflake"
}
)
# Run all tests
response = httpx.get("http://localhost:8000/api/commands/test")