Implementation:TobikoData Sqlmesh Web Server Console
| Knowledge Sources | |
|---|---|
| Domains | Web UI, Server-Sent Events, Plan Tracking |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
The ApiConsole class extends TerminalConsole to provide real-time progress tracking and event streaming for SQLMesh's web UI via Server-Sent Events.
Description
The ApiConsole class implements the Console protocol for SQLMesh's web interface, translating plan execution events into Server-Sent Events (SSE) that stream to the frontend. It tracks three types of plan operations: plan overview (generating plan previews), plan apply (executing deployments), and plan cancel (aborting running plans). The console maintains detailed state for each stage of plan execution including validation, changes, backfills, creation, restate, promotion, and cancellation.
Each stage is tracked with Trackable objects that record start time, end time, status (init/success/fail), and stage-specific data like task counts, batch progress, and queue states. The console uses an asyncio.Queue to buffer events before streaming them to connected clients. It provides sophisticated cancellation handling that properly cleans up partial plan executions.
Usage
This console is instantiated as a singleton (api_console) and used by the FastAPI web server to track all SQLMesh operations initiated through the UI. Frontend clients subscribe to the SSE endpoint to receive real-time updates.
Code Reference
Source Location
- Repository: TobikoData_Sqlmesh
- File: web/server/console.py
Class Signature
class ApiConsole(TerminalConsole):
plan_cancel_stage_tracker: t.Optional[models.PlanCancelStageTracker] = None
plan_apply_stage_tracker: t.Optional[models.PlanApplyStageTracker] = None
plan_overview_stage_tracker: t.Optional[models.PlanOverviewStageTracker] = None
def __init__(self) -> None:
super().__init__()
self.current_task_status: t.Dict[str, t.Dict[str, t.Any]] = {}
self.queue: asyncio.Queue = asyncio.Queue()
Import
from web.server.console import api_console
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| plan | EvaluatablePlan | Yes | Plan being executed (for start_plan_evaluation) |
| snapshots | List[Snapshot] | Yes | Snapshots being created/promoted |
| environment_naming_info | EnvironmentNamingInfo | Yes | Environment display name information |
| default_catalog | Optional[str] | No | Default catalog for display names |
| batched_intervals | Dict[Snapshot, Intervals] | Yes | Intervals to backfill per snapshot |
| snapshot | Snapshot | Yes | Current snapshot being processed |
| interval | Interval | Yes | Current interval being processed |
| result | ModelTextTestResult | Yes | Test execution results |
| exception | ApiException | Yes | Exception to log and stream |
Outputs
| Name | Type | Description |
|---|---|---|
| queue | asyncio.Queue[ServerSentEvent] | Queue of SSE events for streaming to clients |
| Event stream | SSE | Real-time events sent to frontend (plan-apply, plan-overview, plan-cancel, errors, tests) |
Plan Tracking Stages
Plan Overview Stages
| Stage | Description | Data Tracked |
|---|---|---|
| validation | Validating plan parameters | Status, timing |
| changes | Identifying modified snapshots | Added/removed/modified models with diffs |
| backfills | Calculating backfill requirements | Models to backfill, intervals, batch counts |
Plan Apply Stages
| Stage | Description | Data Tracked |
|---|---|---|
| creation | Creating physical tables for new snapshots | Total tasks, completed tasks |
| restate | Restating modified snapshots | Status, timing |
| backfill | Loading historical data | Per-model task queues, completed/total batches, timing |
| promote | Promoting snapshots to environment | Total tasks, completed tasks, target environment |
Plan Cancel Stages
| Stage | Description | Data Tracked |
|---|---|---|
| cancel | Cancelling running plan | Status, timing |
Event Types
Server-Sent Event Names
class EventName(str, enum.Enum):
PING = "ping"
ERRORS = "errors"
WARNINGS = "warnings"
FILE = "file"
FORMAT_FILE = "format-file"
MODELS = "models"
TESTS = "tests"
PLAN_APPLY = "plan-apply"
PLAN_OVERVIEW = "plan-overview"
PLAN_CANCEL = "plan-cancel"
Event Logging Methods
def log_event(self, event: models.EventName, data: t.Union[t.Dict[str, t.Any], t.List[t.Any]]) -> None:
"""Generic event logger that queues SSE"""
def log_event_plan_apply(self) -> None:
"""Stream plan apply progress"""
def log_event_plan_overview(self) -> None:
"""Stream plan overview progress"""
def log_event_plan_cancel(self) -> None:
"""Stream plan cancellation progress"""
def log_exception(self, exception: ApiException) -> None:
"""Stream error to frontend and stop trackers"""
Progress Tracking Methods
Creation Progress
def start_creation_progress(
self,
snapshots: t.List[Snapshot],
environment_naming_info: EnvironmentNamingInfo,
default_catalog: t.Optional[str],
) -> None:
"""Initialize creation stage with total snapshot count"""
def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
"""Increment completed snapshot count"""
def stop_creation_progress(self, success: bool = True) -> None:
"""Mark creation stage complete or failed"""
Evaluation (Backfill) Progress
def start_evaluation_progress(
self,
batched_intervals: t.Dict[Snapshot, Intervals],
environment_naming_info: EnvironmentNamingInfo,
default_catalog: t.Optional[str],
audit_only: bool = False,
) -> None:
"""Initialize backfill tasks with batch counts per snapshot"""
def start_snapshot_evaluation_progress(
self, snapshot: Snapshot, audit_only: bool = False
) -> None:
"""Add snapshot to active processing queue"""
def update_snapshot_evaluation_progress(
self,
snapshot: Snapshot,
interval: Interval,
batch_idx: int,
duration_ms: t.Optional[int],
num_audits_passed: int,
num_audits_failed: int,
audit_only: bool = False,
execution_stats: t.Optional[QueryExecutionStats] = None,
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
) -> None:
"""Update backfill progress, remove from queue when complete"""
def stop_evaluation_progress(self, success: bool = True) -> None:
"""Mark backfill stage complete or failed"""
Promotion Progress
def start_promotion_progress(
self,
snapshots: t.List[SnapshotTableInfo],
environment_naming_info: EnvironmentNamingInfo,
default_catalog: t.Optional[str],
) -> None:
"""Initialize promotion stage with snapshot count"""
def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
"""Increment promoted snapshot count"""
def stop_promotion_progress(self, success: bool = True) -> None:
"""Mark promotion stage complete or failed"""
Cancellation Handling
Cancellation Flow
def is_cancelling_plan(self) -> bool:
"""Check if plan cancellation is in progress"""
return bool(self.plan_cancel_stage_tracker and not self.plan_cancel_stage_tracker.meta.done)
def finish_plan_cancellation(self) -> None:
"""Complete cancellation after stopping all apply tracker stages"""
# 1. Stop cancel tracker's cancel stage
# 2. Check if all apply tracker stages are done
# 3. Stop apply tracker (failed)
# 4. Stop cancel tracker (success)
When a stage fails during cancellation, the console checks is_cancelling_plan() and calls finish_plan_cancellation() instead of stopping the tracker normally.
Stage Cleanup
def stop_plan_tracker_stages(
self,
tracker: t.Optional[t.Union[
models.PlanApplyStageTracker,
models.PlanCancelStageTracker,
models.PlanOverviewStageTracker,
]],
success: bool = True,
) -> None:
"""Stop all stages of a tracker"""
for key in stages:
stage = getattr(tracker, key)
if isinstance(stage, models.Trackable) and stage.meta and not stage.meta.done:
stage.stop(success)
Test Result Reporting
Test Success
def log_test_results(self, result: ModelTextTestResult, target_dialect: str) -> None:
if result.wasSuccessful():
self.log_event(
event=models.EventName.TESTS,
data=models.ReportTestsResult(
message=f"Successfully ran {str(result.testsRun)} tests against {target_dialect}"
).dict(),
)
Test Failures
# Extract failure details
messages = []
for test, details in result.failures + result.errors:
if isinstance(test, ModelTest):
messages.append(
models.ReportTestDetails(
message=f"Failure test: {test.model.name} {test.test_name}",
details=details,
)
)
# Send detailed failure report
self.log_event(
event=models.EventName.TESTS,
data=models.ReportTestsFailure(
message="Test Failure Summary",
total=result.testsRun,
failures=len(result.failures),
errors=len(result.errors),
successful=result.testsRun - len(result.failures) - len(result.errors),
dialect=target_dialect,
details=messages,
traceback=output,
).dict(),
)
Usage Examples
Basic Event Streaming
from web.server.console import api_console
from sse_starlette.sse import EventSourceResponse
from fastapi import APIRouter
router = APIRouter()
@router.get("/events")
async def events():
"""Stream events to frontend"""
async def event_generator():
while True:
event = await api_console.queue.get()
yield event
return EventSourceResponse(event_generator())
Plan Execution with Tracking
from web.server.console import api_console
from sqlmesh.core.context import Context
context = Context()
plan = context.plan("dev")
# Start tracking
api_console.start_plan_evaluation(plan)
try:
# Apply plan (console automatically tracks progress)
context.apply(plan)
# Stop tracking on success
api_console.stop_plan_evaluation()
except Exception as e:
# Log exception and stop tracking
api_console.log_exception(ApiException.from_exception(e))
Custom Event Logging
from web.server.console import api_console
from web.server import models
# Log custom warning
api_console.log_event(
event=models.EventName.WARNINGS,
data={"message": "Custom warning message", "source": "my_module"}
)
# Log file change
api_console.log_event(
event=models.EventName.FILE,
data=models.File(
name="model.sql",
path="models/model.sql",
content="SELECT * FROM source"
).dict()
)
Plan Cancellation
from web.server.console import api_console
from web.server import models
# Start cancel tracker
cancel_tracker = models.PlanCancelStageTracker(environment="dev")
cancel_tracker.add_stage(models.PlanStage.cancel, models.PlanStageCancel())
api_console.start_plan_tracker(cancel_tracker)
# Check if plan is being cancelled
if api_console.is_cancelling_plan():
# Stop current operation
raise PlanCancelledException()
Singleton Instance
# Module exports singleton instance
api_console = ApiConsole()
All web server endpoints use this single shared instance to coordinate event streaming across multiple concurrent plan executions.