Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TobikoData Sqlmesh Web Server Console

From Leeroopedia


Knowledge Sources
Domains Web UI, Server-Sent Events, Plan Tracking
Last Updated 2026-02-07 20:00 GMT

Overview

The ApiConsole class extends TerminalConsole to provide real-time progress tracking and event streaming for SQLMesh's web UI via Server-Sent Events.

Description

The ApiConsole class implements the Console protocol for SQLMesh's web interface, translating plan execution events into Server-Sent Events (SSE) that stream to the frontend. It tracks three types of plan operations: plan overview (generating plan previews), plan apply (executing deployments), and plan cancel (aborting running plans). The console maintains detailed state for each stage of plan execution including validation, changes, backfills, creation, restate, promotion, and cancellation.

Each stage is tracked with Trackable objects that record start time, end time, status (init/success/fail), and stage-specific data like task counts, batch progress, and queue states. The console uses an asyncio.Queue to buffer events before streaming them to connected clients. It provides sophisticated cancellation handling that properly cleans up partial plan executions.

Usage

This console is instantiated as a singleton (api_console) and used by the FastAPI web server to track all SQLMesh operations initiated through the UI. Frontend clients subscribe to the SSE endpoint to receive real-time updates.

Code Reference

Source Location

Class Signature

class ApiConsole(TerminalConsole):
    plan_cancel_stage_tracker: t.Optional[models.PlanCancelStageTracker] = None
    plan_apply_stage_tracker: t.Optional[models.PlanApplyStageTracker] = None
    plan_overview_stage_tracker: t.Optional[models.PlanOverviewStageTracker] = None

    def __init__(self) -> None:
        super().__init__()
        self.current_task_status: t.Dict[str, t.Dict[str, t.Any]] = {}
        self.queue: asyncio.Queue = asyncio.Queue()

Import

from web.server.console import api_console

I/O Contract

Inputs

Name Type Required Description
plan EvaluatablePlan Yes Plan being executed (for start_plan_evaluation)
snapshots List[Snapshot] Yes Snapshots being created/promoted
environment_naming_info EnvironmentNamingInfo Yes Environment display name information
default_catalog Optional[str] No Default catalog for display names
batched_intervals Dict[Snapshot, Intervals] Yes Intervals to backfill per snapshot
snapshot Snapshot Yes Current snapshot being processed
interval Interval Yes Current interval being processed
result ModelTextTestResult Yes Test execution results
exception ApiException Yes Exception to log and stream

Outputs

Name Type Description
queue asyncio.Queue[ServerSentEvent] Queue of SSE events for streaming to clients
Event stream SSE Real-time events sent to frontend (plan-apply, plan-overview, plan-cancel, errors, tests)

Plan Tracking Stages

Plan Overview Stages

Stage Description Data Tracked
validation Validating plan parameters Status, timing
changes Identifying modified snapshots Added/removed/modified models with diffs
backfills Calculating backfill requirements Models to backfill, intervals, batch counts

Plan Apply Stages

Stage Description Data Tracked
creation Creating physical tables for new snapshots Total tasks, completed tasks
restate Restating modified snapshots Status, timing
backfill Loading historical data Per-model task queues, completed/total batches, timing
promote Promoting snapshots to environment Total tasks, completed tasks, target environment

Plan Cancel Stages

Stage Description Data Tracked
cancel Cancelling running plan Status, timing

Event Types

Server-Sent Event Names

class EventName(str, enum.Enum):
    PING = "ping"
    ERRORS = "errors"
    WARNINGS = "warnings"
    FILE = "file"
    FORMAT_FILE = "format-file"
    MODELS = "models"
    TESTS = "tests"
    PLAN_APPLY = "plan-apply"
    PLAN_OVERVIEW = "plan-overview"
    PLAN_CANCEL = "plan-cancel"

Event Logging Methods

def log_event(self, event: models.EventName, data: t.Union[t.Dict[str, t.Any], t.List[t.Any]]) -> None:
    """Generic event logger that queues SSE"""

def log_event_plan_apply(self) -> None:
    """Stream plan apply progress"""

def log_event_plan_overview(self) -> None:
    """Stream plan overview progress"""

def log_event_plan_cancel(self) -> None:
    """Stream plan cancellation progress"""

def log_exception(self, exception: ApiException) -> None:
    """Stream error to frontend and stop trackers"""

Progress Tracking Methods

Creation Progress

def start_creation_progress(
    self,
    snapshots: t.List[Snapshot],
    environment_naming_info: EnvironmentNamingInfo,
    default_catalog: t.Optional[str],
) -> None:
    """Initialize creation stage with total snapshot count"""

def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
    """Increment completed snapshot count"""

def stop_creation_progress(self, success: bool = True) -> None:
    """Mark creation stage complete or failed"""

Evaluation (Backfill) Progress

def start_evaluation_progress(
    self,
    batched_intervals: t.Dict[Snapshot, Intervals],
    environment_naming_info: EnvironmentNamingInfo,
    default_catalog: t.Optional[str],
    audit_only: bool = False,
) -> None:
    """Initialize backfill tasks with batch counts per snapshot"""

def start_snapshot_evaluation_progress(
    self, snapshot: Snapshot, audit_only: bool = False
) -> None:
    """Add snapshot to active processing queue"""

def update_snapshot_evaluation_progress(
    self,
    snapshot: Snapshot,
    interval: Interval,
    batch_idx: int,
    duration_ms: t.Optional[int],
    num_audits_passed: int,
    num_audits_failed: int,
    audit_only: bool = False,
    execution_stats: t.Optional[QueryExecutionStats] = None,
    auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
) -> None:
    """Update backfill progress, remove from queue when complete"""

def stop_evaluation_progress(self, success: bool = True) -> None:
    """Mark backfill stage complete or failed"""

Promotion Progress

def start_promotion_progress(
    self,
    snapshots: t.List[SnapshotTableInfo],
    environment_naming_info: EnvironmentNamingInfo,
    default_catalog: t.Optional[str],
) -> None:
    """Initialize promotion stage with snapshot count"""

def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
    """Increment promoted snapshot count"""

def stop_promotion_progress(self, success: bool = True) -> None:
    """Mark promotion stage complete or failed"""

Cancellation Handling

Cancellation Flow

def is_cancelling_plan(self) -> bool:
    """Check if plan cancellation is in progress"""
    return bool(self.plan_cancel_stage_tracker and not self.plan_cancel_stage_tracker.meta.done)

def finish_plan_cancellation(self) -> None:
    """Complete cancellation after stopping all apply tracker stages"""
    # 1. Stop cancel tracker's cancel stage
    # 2. Check if all apply tracker stages are done
    # 3. Stop apply tracker (failed)
    # 4. Stop cancel tracker (success)

When a stage fails during cancellation, the console checks is_cancelling_plan() and calls finish_plan_cancellation() instead of stopping the tracker normally.

Stage Cleanup

def stop_plan_tracker_stages(
    self,
    tracker: t.Optional[t.Union[
        models.PlanApplyStageTracker,
        models.PlanCancelStageTracker,
        models.PlanOverviewStageTracker,
    ]],
    success: bool = True,
) -> None:
    """Stop all stages of a tracker"""
    for key in stages:
        stage = getattr(tracker, key)
        if isinstance(stage, models.Trackable) and stage.meta and not stage.meta.done:
            stage.stop(success)

Test Result Reporting

Test Success

def log_test_results(self, result: ModelTextTestResult, target_dialect: str) -> None:
    if result.wasSuccessful():
        self.log_event(
            event=models.EventName.TESTS,
            data=models.ReportTestsResult(
                message=f"Successfully ran {str(result.testsRun)} tests against {target_dialect}"
            ).dict(),
        )

Test Failures

# Extract failure details
messages = []
for test, details in result.failures + result.errors:
    if isinstance(test, ModelTest):
        messages.append(
            models.ReportTestDetails(
                message=f"Failure test: {test.model.name} {test.test_name}",
                details=details,
            )
        )

# Send detailed failure report
self.log_event(
    event=models.EventName.TESTS,
    data=models.ReportTestsFailure(
        message="Test Failure Summary",
        total=result.testsRun,
        failures=len(result.failures),
        errors=len(result.errors),
        successful=result.testsRun - len(result.failures) - len(result.errors),
        dialect=target_dialect,
        details=messages,
        traceback=output,
    ).dict(),
)

Usage Examples

Basic Event Streaming

from web.server.console import api_console
from sse_starlette.sse import EventSourceResponse
from fastapi import APIRouter

router = APIRouter()

@router.get("/events")
async def events():
    """Stream events to frontend"""
    async def event_generator():
        while True:
            event = await api_console.queue.get()
            yield event

    return EventSourceResponse(event_generator())

Plan Execution with Tracking

from web.server.console import api_console
from sqlmesh.core.context import Context

context = Context()
plan = context.plan("dev")

# Start tracking
api_console.start_plan_evaluation(plan)

try:
    # Apply plan (console automatically tracks progress)
    context.apply(plan)

    # Stop tracking on success
    api_console.stop_plan_evaluation()
except Exception as e:
    # Log exception and stop tracking
    api_console.log_exception(ApiException.from_exception(e))

Custom Event Logging

from web.server.console import api_console
from web.server import models

# Log custom warning
api_console.log_event(
    event=models.EventName.WARNINGS,
    data={"message": "Custom warning message", "source": "my_module"}
)

# Log file change
api_console.log_event(
    event=models.EventName.FILE,
    data=models.File(
        name="model.sql",
        path="models/model.sql",
        content="SELECT * FROM source"
    ).dict()
)

Plan Cancellation

from web.server.console import api_console
from web.server import models

# Start cancel tracker
cancel_tracker = models.PlanCancelStageTracker(environment="dev")
cancel_tracker.add_stage(models.PlanStage.cancel, models.PlanStageCancel())
api_console.start_plan_tracker(cancel_tracker)

# Check if plan is being cancelled
if api_console.is_cancelling_plan():
    # Stop current operation
    raise PlanCancelledException()

Singleton Instance

# Module exports singleton instance
api_console = ApiConsole()

All web server endpoints use this single shared instance to coordinate event streaming across multiple concurrent plan executions.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment