Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TobikoData Sqlmesh Web Server Models

From Leeroopedia


Knowledge Sources
Domains Web UI, Data Models, API Contracts
Last Updated 2026-02-07 20:00 GMT

Overview

Pydantic data models defining the web API contract between SQLMesh's FastAPI backend and React frontend.

Description

This module defines 60+ Pydantic models that structure all data exchanged between the web server and UI client. It includes models for file system artifacts (File, Directory), model representations (Model, Column, ModelDetails), plan execution tracking (PlanStageTracker, BackfillTask), change detection (ChangeDirect, ChangeIndirect, ModelsDiff), environment management (Environments), test results (TestResult, TestErrorOrFailure), schema/data diffing (SchemaDiff, RowDiff, TableDiff), and error reporting (ApiExceptionPayload).

The models use sophisticated validation including custom validators for schema diff comparisons, restate model string splitting, and SQLGlot DataType serialization. They implement Trackable base classes with metadata for timing, status, and progress tracking. The module defines enums for modes (IDE/DOCS/CATALOG/PLAN), event names (SSE types), file types, model types, apply types, and plan stages.

Usage

These models are imported throughout the web server package for request/response serialization, event streaming payloads, and internal state management. The frontend TypeScript code mirrors these structures for type safety.

Code Reference

Source Location

Key Model Categories

# File System
class File(PydanticModel): ...
class Directory(PydanticModel): ...
class ArtifactChange(PydanticModel): ...

# Models and Metadata
class Model(PydanticModel): ...
class Column(PydanticModel): ...
class ModelDetails(PydanticModel): ...
class Reference(PydanticModel): ...

# Plan Tracking
class PlanStageTracker(Trackable, PlanDates): ...
class PlanOverviewStageTracker(PlanStageTracker): ...
class PlanApplyStageTracker(PlanStageTracker): ...
class PlanCancelStageTracker(PlanStageTracker): ...

# Changes
class ChangeDirect(ChangeDisplay): ...
class ChangeIndirect(ChangeDisplay): ...
class ModelsDiff(PydanticModel): ...

# Testing
class TestResult(PydanticModel): ...
class ReportTestsFailure(ReportTestsResult): ...

Import

from web.server import models

I/O Contract

Inputs

Name Type Required Description
Various model fields mixed Varies Pydantic validates all fields on initialization
Custom validator inputs mixed Yes Special fields like schema diffs, restate models

Outputs

Name Type Description
Validated model instances PydanticModel Type-safe, validated data structures
JSON dictionaries dict Serialized via .dict() or jsonable_encoder
Event payloads dict SSE event data excluding None values

Enumerations

Server Modes

class Mode(str, enum.Enum):
    IDE = "ide"      # Allow all modules
    DOCS = "docs"    # Only docs module
    CATALOG = "catalog"  # Only docs module
    PLAN = "plan"    # Allow plan

Event Names (SSE)

class EventName(str, enum.Enum):
    PING = "ping"
    ERRORS = "errors"
    WARNINGS = "warnings"
    FILE = "file"
    FORMAT_FILE = "format-file"
    MODELS = "models"
    TESTS = "tests"
    PLAN_APPLY = "plan-apply"
    PLAN_OVERVIEW = "plan-overview"
    PLAN_CANCEL = "plan-cancel"

UI Modules

class Modules(str, enum.Enum):
    EDITOR = "editor"          # Edit files and run queries
    FILES = "files"            # Project files
    DATA_CATALOG = "data-catalog"  # Data catalog
    PLANS = "plans"            # Run/apply plans
    TESTS = "tests"            # Run tests
    AUDITS = "audits"          # Run audits
    ERRORS = "errors"          # See errors
    DATA = "data"              # Query data
    LINEAGE = "lineage"        # Lineage

Model Types

class ModelType(str, enum.Enum):
    PYTHON = "python"
    SQL = "sql"
    SEED = "seed"
    EXTERNAL = "external"
    SOURCE = "source"

Plan Stages

class PlanStage(str, enum.Enum):
    validation = "validation"
    changes = "changes"
    backfills = "backfills"
    creation = "creation"
    restate = "restate"
    backfill = "backfill"
    promote = "promote"
    cancel = "cancel"

Status Enum

class Status(str, enum.Enum):
    INIT = "init"
    SUCCESS = "success"
    FAIL = "fail"

File System Models

File Model

class File(PydanticModel):
    name: str
    path: str
    extension: str = ""  # Auto-populated from name
    content: t.Optional[str] = None

    @field_validator("extension", mode="before")
    def default_extension(cls, v: str, info: ValidationInfo) -> str:
        if "name" in info.data:
            return pathlib.Path(info.data["name"]).suffix
        return v

Directory Model

class Directory(PydanticModel):
    name: str
    path: str
    directories: t.List[Directory] = []  # Nested directories
    files: t.List[File] = []

Artifact Change Model

class ArtifactChange(PydanticModel):
    change: Change  # watchfiles.Change enum
    path: str
    type: t.Optional[ArtifactType] = None  # file or directory
    file: t.Optional[File] = None

Model Representation

Column Model

class Column(PydanticModel):
    name: str
    type: str
    description: t.Optional[str] = None

Reference Model

class Reference(PydanticModel):
    name: str
    expression: str
    unique: bool

Model Details

class ModelDetails(PydanticModel):
    owner: t.Optional[str] = None
    kind: t.Optional[str] = None
    batch_size: t.Optional[int] = None
    cron: t.Optional[str] = None
    stamp: t.Optional[TimeLike] = None
    start: t.Optional[TimeLike] = None
    retention: t.Optional[int] = None
    table_format: t.Optional[str] = None
    storage_format: t.Optional[str] = None
    time_column: t.Optional[str] = None
    tags: t.Optional[str] = None
    references: t.List[Reference] = []
    partitioned_by: t.Optional[str] = None
    clustered_by: t.Optional[str] = None
    lookback: t.Optional[int] = None
    cron_prev: t.Optional[TimeLike] = None
    cron_next: t.Optional[TimeLike] = None
    interval_unit: t.Optional[IntervalUnit] = None
    annotated: t.Optional[bool] = None

Model

class Model(PydanticModel):
    name: str
    fqn: str  # Fully qualified name
    path: t.Optional[str] = None  # Relative to project root
    full_path: t.Optional[str] = None  # Absolute path
    dialect: str
    type: ModelType
    columns: t.List[Column]
    description: t.Optional[str] = None
    details: t.Optional[ModelDetails] = None
    sql: t.Optional[str] = None
    definition: t.Optional[str] = None
    default_catalog: t.Optional[str] = None
    hash: str

Change Detection Models

Change Display Base

class ChangeDisplay(PydanticModel):
    name: str
    view_name: str
    node_type: NodeType = NodeType.MODEL
    parents: t.Set[str] = set()

    @staticmethod
    def get_view_name(
        snapshots: t.Dict[SnapshotId, Snapshot],
        snapshot_id: SnapshotId,
        environment_naming_info: EnvironmentNamingInfo,
        default_catalog: t.Optional[str],
    ) -> str:
        """Get display name for snapshot"""

    @staticmethod
    def get_node_type(snapshots: t.Dict[SnapshotId, Snapshot], snapshot_id: SnapshotId) -> NodeType:
        """Get node type from snapshot"""

Change Direct

class ChangeDirect(ChangeDisplay):
    diff: str  # Text diff of changes
    indirect: t.List[ChangeDisplay] = []  # Indirectly affected children
    direct: t.List[ChangeDisplay] = []    # Directly affected children
    change_category: t.Optional[SnapshotChangeCategory] = None

Change Indirect

class ChangeIndirect(ChangeDisplay):
    pass  # Only inherits display fields

Models Diff

class ModelsDiff(PydanticModel):
    direct: t.List[ChangeDirect] = []
    indirect: t.List[ChangeIndirect] = []
    metadata: t.List[ChangeDisplay] = []

    @classmethod
    def get_modified_snapshots(
        cls,
        context: Context,
        plan: Plan,
    ) -> ModelsDiff:
        """
        Get the modified snapshots for a environment.
        Builds parent relationships and categorizes changes.
        """

Trackable Base Classes

Trackable Meta

class TrackableMeta(PydanticModel):
    status: Status = Status.INIT
    start: int = Field(default_factory=now_timestamp)
    end: t.Optional[int] = None
    done: bool = False

    @property
    def duration(self) -> int | None:
        return self.end - self.start if self.start and self.end else None

Trackable

class Trackable(PydanticModel):
    meta: TrackableMeta = Field(default_factory=TrackableMeta)

    def stop(self, success: bool = True) -> None:
        """Mark stage complete with success/fail status"""
        self.meta.status = Status.SUCCESS if success else Status.FAIL
        self.meta.end = now_timestamp()
        self.meta.done = bool(self.meta.start and self.meta.end)

    def update(self, data: t.Dict[str, t.Any]) -> None:
        """Update fields with dictionary"""
        for k, v in data.items():
            setattr(self, k, v)

Plan Stage Models

Plan Stage Tracker Base

class PlanStageTracker(Trackable, PlanDates):
    environment: t.Optional[str] = None
    plan_options: t.Optional[PlanOptions] = None

    def add_stage(self, stage: PlanStage, data: Trackable) -> None:
        """Dynamically add stage to tracker"""
        setattr(self, stage, data)

Overview Stage Tracker

class PlanOverviewStageTracker(PlanStageTracker):
    validation: t.Optional[PlanStageValidation] = None
    changes: t.Optional[PlanStageChanges] = None
    backfills: t.Optional[PlanStageBackfills] = None

Apply Stage Tracker

class PlanApplyStageTracker(PlanStageTracker):
    creation: t.Optional[PlanStageCreation] = None
    restate: t.Optional[PlanStageRestate] = None
    backfill: t.Optional[PlanStageBackfill] = None
    promote: t.Optional[PlanStagePromote] = None

Cancel Stage Tracker

class PlanCancelStageTracker(PlanStageTracker):
    cancel: t.Optional[PlanStageCancel] = None

Individual Stages

class PlanStageValidation(Trackable):
    pass

class PlanStageChanges(Trackable, PlanChanges):
    pass

class PlanStageBackfills(Trackable):
    models: t.Optional[t.List[BackfillDetails]] = None

class PlanStageCreation(Trackable):
    total_tasks: int
    num_tasks: int

class PlanStageRestate(Trackable):
    pass

class PlanStageBackfill(Trackable):
    queue: t.Set[str] = set()
    tasks: t.Dict[str, BackfillTask] = {}

class PlanStagePromote(Trackable):
    total_tasks: int
    num_tasks: int
    target_environment: str

class PlanStageCancel(Trackable):
    pass

Backfill Models

Backfill Details

class BackfillDetails(ChangeDisplay):
    interval: t.List[str]  # Start and end dates
    batches: int

Backfill Task

class BackfillTask(ChangeDisplay):
    completed: int
    total: int
    start: int
    end: t.Optional[int] = None
    interval: t.Optional[t.List[str]] = None

Plan Input Models

Plan Dates

class PlanDates(PydanticModel):
    start: t.Optional[TimeLike] = None
    end: t.Optional[TimeLike] = None

Plan Options

class PlanOptions(PydanticModel):
    skip_tests: bool = False
    skip_backfill: bool = False
    no_gaps: bool = False
    forward_only: bool = False
    no_auto_categorization: bool = False
    include_unmodified: bool = False
    create_from: t.Optional[str] = None
    restate_models: t.Optional[str] = None  # Comma-separated or list
    auto_apply: bool = False

    @field_validator("restate_models")
    @classmethod
    def validate_restate_models(cls, v: str | list[str]) -> list[str]:
        """Convert comma-separated string to list"""
        if isinstance(v, str):
            return v.split(",")
        return v

Query and Evaluation Models

Evaluate Input

class EvaluateInput(PydanticModel):
    model: str
    start: TimeLike
    end: TimeLike
    execution_time: TimeLike
    limit: int = 1000

Fetchdf Input

class FetchdfInput(PydanticModel):
    sql: str
    limit: int = 1000

Render Input

class RenderInput(PydanticModel):
    model: str
    start: t.Optional[TimeLike] = None
    end: t.Optional[TimeLike] = None
    execution_time: t.Optional[TimeLike] = None
    expand: t.Union[bool, t.Iterable[str]] = False
    pretty: bool = True
    dialect: t.Optional[str] = None

Query

class Query(PydanticModel):
    sql: str

Diff Models

Schema Diff

class SchemaDiff(PydanticModel):
    source: str
    target: str
    source_schema: t.Dict[str, str]
    target_schema: t.Dict[str, str]
    added: t.Dict[str, str]
    removed: t.Dict[str, str]
    modified: t.Dict[str, str]  # Format: "old_type → new_type"

    @field_validator("source_schema", "target_schema", "added", "removed", "modified", mode="before")
    @classmethod
    def validate_schema(cls, v: t.Union[...], info: ValidationInfo) -> t.Dict[str, str]:
        """Convert exp.DataType objects to strings, handle tuples for modified"""
        # Converts (DataType, DataType) tuples to "type1 → type2" strings

Row Diff

class RowDiff(PydanticModel):
    source: str
    target: str
    stats: t.Dict[str, float]
    sample: t.Dict[str, t.Any]
    joined_sample: t.Dict[str, t.Any]
    s_sample: t.Dict[str, t.Any]
    t_sample: t.Dict[str, t.Any]
    column_stats: t.Dict[str, t.Any]
    source_count: int
    target_count: int
    count_pct_change: float
    decimals: int
    processed_sample_data: t.Optional[ProcessedSampleData] = None

Table Diff

class TableDiff(PydanticModel):
    schema_diff: SchemaDiff
    row_diff: RowDiff
    on: t.List[t.List[str]]  # Join keys

Test Models

Test Case

class TestCase(PydanticModel):
    name: str
    path: str

Test Error or Failure

class TestErrorOrFailure(TestCase):
    tb: str  # Traceback

Test Skipped

class TestSkipped(TestCase):
    reason: str

Test Result

class TestResult(PydanticModel):
    tests_run: int
    failures: t.List[TestErrorOrFailure]
    errors: t.List[TestErrorOrFailure]
    skipped: t.List[TestSkipped]
    successes: t.List[TestCase]

Report Test Results

class ReportTestsResult(PydanticModel):
    message: str

class ReportTestDetails(ReportTestsResult):
    details: str

class ReportTestsFailure(ReportTestsResult):
    total: int
    failures: int
    errors: int
    successful: int
    dialect: str
    details: t.List[ReportTestDetails]
    traceback: str

Error Models

API Exception Payload

class ApiExceptionPayload(PydanticModel):
    timestamp: int
    message: str
    origin: str
    status: t.Optional[int] = None
    trigger: t.Optional[str] = None
    type: t.Optional[str] = None
    description: t.Optional[str] = None
    traceback: t.Optional[str] = None
    stack: t.Optional[t.List[str]] = None

Environment Models

Environments

class Environments(PydanticModel):
    environments: t.Dict[str, Environment] = {}
    pinned_environments: t.Set[str] = set()
    default_target_environment: str = ""

Usage Examples

Creating Model Instance

from web.server import models

# Create model with validation
model = models.Model(
    name="my_model",
    fqn="db.schema.my_model",
    path="models/my_model.sql",
    dialect="duckdb",
    type=models.ModelType.SQL,
    columns=[
        models.Column(name="id", type="INT", description="Primary key"),
        models.Column(name="name", type="VARCHAR", description="User name"),
    ],
    hash="abc123",
)

# Serialize to dict
model_dict = model.dict()

# Exclude None values for SSE
model_dict_clean = model.dict(exclude_none=True)

Tracking Plan Progress

from web.server import models

# Create tracker
tracker = models.PlanApplyStageTracker(
    environment="dev",
    plan_options=models.PlanOptions(skip_tests=False, auto_apply=False),
)

# Add creation stage
tracker.add_stage(
    models.PlanStage.creation,
    models.PlanStageCreation(total_tasks=10, num_tasks=0),
)

# Update progress
tracker.creation.update({"num_tasks": 5})

# Stop stage
tracker.creation.stop(success=True)

# Serialize for SSE
event_data = tracker.dict(exclude_none=True)

Building Models Diff

from web.server import models
from sqlmesh.core.context import Context

context = Context()
plan = context.plan("dev")

# Get modified snapshots with parent relationships
diff = models.ModelsDiff.get_modified_snapshots(context, plan)

# Access changes
for change in diff.direct:
    print(f"Direct change: {change.name}")
    print(f"Diff: {change.diff}")
    print(f"Affected children: {[c.name for c in change.indirect]}")

for change in diff.indirect:
    print(f"Indirect change: {change.name}")

Schema Diff Validation

from web.server import models
from sqlglot import exp

# Schema diff with DataType objects
schema_diff = models.SchemaDiff(
    source="table_a",
    target="table_b",
    source_schema={"id": exp.DataType.build("INT"), "name": exp.DataType.build("VARCHAR")},
    target_schema={"id": exp.DataType.build("BIGINT"), "name": exp.DataType.build("VARCHAR")},
    added={},
    removed={},
    modified={"id": (exp.DataType.build("INT"), exp.DataType.build("BIGINT"))},
)

# Automatically converts to strings
assert schema_diff.source_schema["id"] == "INT"
assert schema_diff.modified["id"] == "INT → BIGINT"

Test Result Reporting

from web.server import models

test_result = models.TestResult(
    tests_run=10,
    failures=[
        models.TestErrorOrFailure(
            name="test_model_a",
            path="tests/test_model_a.yaml",
            tb="AssertionError: Expected 5 rows, got 3",
        )
    ],
    errors=[],
    skipped=[
        models.TestSkipped(
            name="test_model_b",
            path="tests/test_model_b.yaml",
            reason="Missing test data",
        )
    ],
    successes=[
        models.TestCase(name=f"test_{i}", path=f"tests/test_{i}.yaml")
        for i in range(8)
    ],
)

# Serialize for API response
result_dict = test_result.dict()

Constants

SUPPORTED_EXTENSIONS = {".py", ".sql", ".yaml", ".yml", ".csv"}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment