Implementation:TobikoData Sqlmesh Web Server Models
| Knowledge Sources | |
|---|---|
| Domains | Web UI, Data Models, API Contracts |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
Pydantic data models defining the web API contract between SQLMesh's FastAPI backend and React frontend.
Description
This module defines 60+ Pydantic models that structure all data exchanged between the web server and UI client. It includes models for file system artifacts (File, Directory), model representations (Model, Column, ModelDetails), plan execution tracking (PlanStageTracker, BackfillTask), change detection (ChangeDirect, ChangeIndirect, ModelsDiff), environment management (Environments), test results (TestResult, TestErrorOrFailure), schema/data diffing (SchemaDiff, RowDiff, TableDiff), and error reporting (ApiExceptionPayload).
The models use sophisticated validation including custom validators for schema diff comparisons, restate model string splitting, and SQLGlot DataType serialization. They implement Trackable base classes with metadata for timing, status, and progress tracking. The module defines enums for modes (IDE/DOCS/CATALOG/PLAN), event names (SSE types), file types, model types, apply types, and plan stages.
Usage
These models are imported throughout the web server package for request/response serialization, event streaming payloads, and internal state management. The frontend TypeScript code mirrors these structures for type safety.
Code Reference
Source Location
- Repository: TobikoData_Sqlmesh
- File: web/server/models.py
Key Model Categories
# File System
class File(PydanticModel): ...
class Directory(PydanticModel): ...
class ArtifactChange(PydanticModel): ...
# Models and Metadata
class Model(PydanticModel): ...
class Column(PydanticModel): ...
class ModelDetails(PydanticModel): ...
class Reference(PydanticModel): ...
# Plan Tracking
class PlanStageTracker(Trackable, PlanDates): ...
class PlanOverviewStageTracker(PlanStageTracker): ...
class PlanApplyStageTracker(PlanStageTracker): ...
class PlanCancelStageTracker(PlanStageTracker): ...
# Changes
class ChangeDirect(ChangeDisplay): ...
class ChangeIndirect(ChangeDisplay): ...
class ModelsDiff(PydanticModel): ...
# Testing
class TestResult(PydanticModel): ...
class ReportTestsFailure(ReportTestsResult): ...
Import
from web.server import models
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| Various model fields | mixed | Varies | Pydantic validates all fields on initialization |
| Custom validator inputs | mixed | Yes | Special fields like schema diffs, restate models |
Outputs
| Name | Type | Description |
|---|---|---|
| Validated model instances | PydanticModel | Type-safe, validated data structures |
| JSON dictionaries | dict | Serialized via .dict() or jsonable_encoder |
| Event payloads | dict | SSE event data excluding None values |
Enumerations
Server Modes
class Mode(str, enum.Enum):
IDE = "ide" # Allow all modules
DOCS = "docs" # Only docs module
CATALOG = "catalog" # Only docs module
PLAN = "plan" # Allow plan
Event Names (SSE)
class EventName(str, enum.Enum):
PING = "ping"
ERRORS = "errors"
WARNINGS = "warnings"
FILE = "file"
FORMAT_FILE = "format-file"
MODELS = "models"
TESTS = "tests"
PLAN_APPLY = "plan-apply"
PLAN_OVERVIEW = "plan-overview"
PLAN_CANCEL = "plan-cancel"
UI Modules
class Modules(str, enum.Enum):
EDITOR = "editor" # Edit files and run queries
FILES = "files" # Project files
DATA_CATALOG = "data-catalog" # Data catalog
PLANS = "plans" # Run/apply plans
TESTS = "tests" # Run tests
AUDITS = "audits" # Run audits
ERRORS = "errors" # See errors
DATA = "data" # Query data
LINEAGE = "lineage" # Lineage
Model Types
class ModelType(str, enum.Enum):
PYTHON = "python"
SQL = "sql"
SEED = "seed"
EXTERNAL = "external"
SOURCE = "source"
Plan Stages
class PlanStage(str, enum.Enum):
validation = "validation"
changes = "changes"
backfills = "backfills"
creation = "creation"
restate = "restate"
backfill = "backfill"
promote = "promote"
cancel = "cancel"
Status Enum
class Status(str, enum.Enum):
INIT = "init"
SUCCESS = "success"
FAIL = "fail"
File System Models
File Model
class File(PydanticModel):
name: str
path: str
extension: str = "" # Auto-populated from name
content: t.Optional[str] = None
@field_validator("extension", mode="before")
def default_extension(cls, v: str, info: ValidationInfo) -> str:
if "name" in info.data:
return pathlib.Path(info.data["name"]).suffix
return v
Directory Model
class Directory(PydanticModel):
name: str
path: str
directories: t.List[Directory] = [] # Nested directories
files: t.List[File] = []
Artifact Change Model
class ArtifactChange(PydanticModel):
change: Change # watchfiles.Change enum
path: str
type: t.Optional[ArtifactType] = None # file or directory
file: t.Optional[File] = None
Model Representation
Column Model
class Column(PydanticModel):
name: str
type: str
description: t.Optional[str] = None
Reference Model
class Reference(PydanticModel):
name: str
expression: str
unique: bool
Model Details
class ModelDetails(PydanticModel):
owner: t.Optional[str] = None
kind: t.Optional[str] = None
batch_size: t.Optional[int] = None
cron: t.Optional[str] = None
stamp: t.Optional[TimeLike] = None
start: t.Optional[TimeLike] = None
retention: t.Optional[int] = None
table_format: t.Optional[str] = None
storage_format: t.Optional[str] = None
time_column: t.Optional[str] = None
tags: t.Optional[str] = None
references: t.List[Reference] = []
partitioned_by: t.Optional[str] = None
clustered_by: t.Optional[str] = None
lookback: t.Optional[int] = None
cron_prev: t.Optional[TimeLike] = None
cron_next: t.Optional[TimeLike] = None
interval_unit: t.Optional[IntervalUnit] = None
annotated: t.Optional[bool] = None
Model
class Model(PydanticModel):
name: str
fqn: str # Fully qualified name
path: t.Optional[str] = None # Relative to project root
full_path: t.Optional[str] = None # Absolute path
dialect: str
type: ModelType
columns: t.List[Column]
description: t.Optional[str] = None
details: t.Optional[ModelDetails] = None
sql: t.Optional[str] = None
definition: t.Optional[str] = None
default_catalog: t.Optional[str] = None
hash: str
Change Detection Models
Change Display Base
class ChangeDisplay(PydanticModel):
name: str
view_name: str
node_type: NodeType = NodeType.MODEL
parents: t.Set[str] = set()
@staticmethod
def get_view_name(
snapshots: t.Dict[SnapshotId, Snapshot],
snapshot_id: SnapshotId,
environment_naming_info: EnvironmentNamingInfo,
default_catalog: t.Optional[str],
) -> str:
"""Get display name for snapshot"""
@staticmethod
def get_node_type(snapshots: t.Dict[SnapshotId, Snapshot], snapshot_id: SnapshotId) -> NodeType:
"""Get node type from snapshot"""
Change Direct
class ChangeDirect(ChangeDisplay):
diff: str # Text diff of changes
indirect: t.List[ChangeDisplay] = [] # Indirectly affected children
direct: t.List[ChangeDisplay] = [] # Directly affected children
change_category: t.Optional[SnapshotChangeCategory] = None
Change Indirect
class ChangeIndirect(ChangeDisplay):
pass # Only inherits display fields
Models Diff
class ModelsDiff(PydanticModel):
direct: t.List[ChangeDirect] = []
indirect: t.List[ChangeIndirect] = []
metadata: t.List[ChangeDisplay] = []
@classmethod
def get_modified_snapshots(
cls,
context: Context,
plan: Plan,
) -> ModelsDiff:
"""
Get the modified snapshots for a environment.
Builds parent relationships and categorizes changes.
"""
Trackable Base Classes
Trackable Meta
class TrackableMeta(PydanticModel):
status: Status = Status.INIT
start: int = Field(default_factory=now_timestamp)
end: t.Optional[int] = None
done: bool = False
@property
def duration(self) -> int | None:
return self.end - self.start if self.start and self.end else None
Trackable
class Trackable(PydanticModel):
meta: TrackableMeta = Field(default_factory=TrackableMeta)
def stop(self, success: bool = True) -> None:
"""Mark stage complete with success/fail status"""
self.meta.status = Status.SUCCESS if success else Status.FAIL
self.meta.end = now_timestamp()
self.meta.done = bool(self.meta.start and self.meta.end)
def update(self, data: t.Dict[str, t.Any]) -> None:
"""Update fields with dictionary"""
for k, v in data.items():
setattr(self, k, v)
Plan Stage Models
Plan Stage Tracker Base
class PlanStageTracker(Trackable, PlanDates):
environment: t.Optional[str] = None
plan_options: t.Optional[PlanOptions] = None
def add_stage(self, stage: PlanStage, data: Trackable) -> None:
"""Dynamically add stage to tracker"""
setattr(self, stage, data)
Overview Stage Tracker
class PlanOverviewStageTracker(PlanStageTracker):
validation: t.Optional[PlanStageValidation] = None
changes: t.Optional[PlanStageChanges] = None
backfills: t.Optional[PlanStageBackfills] = None
Apply Stage Tracker
class PlanApplyStageTracker(PlanStageTracker):
creation: t.Optional[PlanStageCreation] = None
restate: t.Optional[PlanStageRestate] = None
backfill: t.Optional[PlanStageBackfill] = None
promote: t.Optional[PlanStagePromote] = None
Cancel Stage Tracker
class PlanCancelStageTracker(PlanStageTracker):
cancel: t.Optional[PlanStageCancel] = None
Individual Stages
class PlanStageValidation(Trackable):
pass
class PlanStageChanges(Trackable, PlanChanges):
pass
class PlanStageBackfills(Trackable):
models: t.Optional[t.List[BackfillDetails]] = None
class PlanStageCreation(Trackable):
total_tasks: int
num_tasks: int
class PlanStageRestate(Trackable):
pass
class PlanStageBackfill(Trackable):
queue: t.Set[str] = set()
tasks: t.Dict[str, BackfillTask] = {}
class PlanStagePromote(Trackable):
total_tasks: int
num_tasks: int
target_environment: str
class PlanStageCancel(Trackable):
pass
Backfill Models
Backfill Details
class BackfillDetails(ChangeDisplay):
interval: t.List[str] # Start and end dates
batches: int
Backfill Task
class BackfillTask(ChangeDisplay):
completed: int
total: int
start: int
end: t.Optional[int] = None
interval: t.Optional[t.List[str]] = None
Plan Input Models
Plan Dates
class PlanDates(PydanticModel):
start: t.Optional[TimeLike] = None
end: t.Optional[TimeLike] = None
Plan Options
class PlanOptions(PydanticModel):
skip_tests: bool = False
skip_backfill: bool = False
no_gaps: bool = False
forward_only: bool = False
no_auto_categorization: bool = False
include_unmodified: bool = False
create_from: t.Optional[str] = None
restate_models: t.Optional[str] = None # Comma-separated or list
auto_apply: bool = False
@field_validator("restate_models")
@classmethod
def validate_restate_models(cls, v: str | list[str]) -> list[str]:
"""Convert comma-separated string to list"""
if isinstance(v, str):
return v.split(",")
return v
Query and Evaluation Models
Evaluate Input
class EvaluateInput(PydanticModel):
model: str
start: TimeLike
end: TimeLike
execution_time: TimeLike
limit: int = 1000
Fetchdf Input
class FetchdfInput(PydanticModel):
sql: str
limit: int = 1000
Render Input
class RenderInput(PydanticModel):
model: str
start: t.Optional[TimeLike] = None
end: t.Optional[TimeLike] = None
execution_time: t.Optional[TimeLike] = None
expand: t.Union[bool, t.Iterable[str]] = False
pretty: bool = True
dialect: t.Optional[str] = None
Query
class Query(PydanticModel):
sql: str
Diff Models
Schema Diff
class SchemaDiff(PydanticModel):
source: str
target: str
source_schema: t.Dict[str, str]
target_schema: t.Dict[str, str]
added: t.Dict[str, str]
removed: t.Dict[str, str]
modified: t.Dict[str, str] # Format: "old_type → new_type"
@field_validator("source_schema", "target_schema", "added", "removed", "modified", mode="before")
@classmethod
def validate_schema(cls, v: t.Union[...], info: ValidationInfo) -> t.Dict[str, str]:
"""Convert exp.DataType objects to strings, handle tuples for modified"""
# Converts (DataType, DataType) tuples to "type1 → type2" strings
Row Diff
class RowDiff(PydanticModel):
source: str
target: str
stats: t.Dict[str, float]
sample: t.Dict[str, t.Any]
joined_sample: t.Dict[str, t.Any]
s_sample: t.Dict[str, t.Any]
t_sample: t.Dict[str, t.Any]
column_stats: t.Dict[str, t.Any]
source_count: int
target_count: int
count_pct_change: float
decimals: int
processed_sample_data: t.Optional[ProcessedSampleData] = None
Table Diff
class TableDiff(PydanticModel):
schema_diff: SchemaDiff
row_diff: RowDiff
on: t.List[t.List[str]] # Join keys
Test Models
Test Case
class TestCase(PydanticModel):
name: str
path: str
Test Error or Failure
class TestErrorOrFailure(TestCase):
tb: str # Traceback
Test Skipped
class TestSkipped(TestCase):
reason: str
Test Result
class TestResult(PydanticModel):
tests_run: int
failures: t.List[TestErrorOrFailure]
errors: t.List[TestErrorOrFailure]
skipped: t.List[TestSkipped]
successes: t.List[TestCase]
Report Test Results
class ReportTestsResult(PydanticModel):
message: str
class ReportTestDetails(ReportTestsResult):
details: str
class ReportTestsFailure(ReportTestsResult):
total: int
failures: int
errors: int
successful: int
dialect: str
details: t.List[ReportTestDetails]
traceback: str
Error Models
API Exception Payload
class ApiExceptionPayload(PydanticModel):
timestamp: int
message: str
origin: str
status: t.Optional[int] = None
trigger: t.Optional[str] = None
type: t.Optional[str] = None
description: t.Optional[str] = None
traceback: t.Optional[str] = None
stack: t.Optional[t.List[str]] = None
Environment Models
Environments
class Environments(PydanticModel):
environments: t.Dict[str, Environment] = {}
pinned_environments: t.Set[str] = set()
default_target_environment: str = ""
Usage Examples
Creating Model Instance
from web.server import models
# Create model with validation
model = models.Model(
name="my_model",
fqn="db.schema.my_model",
path="models/my_model.sql",
dialect="duckdb",
type=models.ModelType.SQL,
columns=[
models.Column(name="id", type="INT", description="Primary key"),
models.Column(name="name", type="VARCHAR", description="User name"),
],
hash="abc123",
)
# Serialize to dict
model_dict = model.dict()
# Exclude None values for SSE
model_dict_clean = model.dict(exclude_none=True)
Tracking Plan Progress
from web.server import models
# Create tracker
tracker = models.PlanApplyStageTracker(
environment="dev",
plan_options=models.PlanOptions(skip_tests=False, auto_apply=False),
)
# Add creation stage
tracker.add_stage(
models.PlanStage.creation,
models.PlanStageCreation(total_tasks=10, num_tasks=0),
)
# Update progress
tracker.creation.update({"num_tasks": 5})
# Stop stage
tracker.creation.stop(success=True)
# Serialize for SSE
event_data = tracker.dict(exclude_none=True)
Building Models Diff
from web.server import models
from sqlmesh.core.context import Context
context = Context()
plan = context.plan("dev")
# Get modified snapshots with parent relationships
diff = models.ModelsDiff.get_modified_snapshots(context, plan)
# Access changes
for change in diff.direct:
print(f"Direct change: {change.name}")
print(f"Diff: {change.diff}")
print(f"Affected children: {[c.name for c in change.indirect]}")
for change in diff.indirect:
print(f"Indirect change: {change.name}")
Schema Diff Validation
from web.server import models
from sqlglot import exp
# Schema diff with DataType objects
schema_diff = models.SchemaDiff(
source="table_a",
target="table_b",
source_schema={"id": exp.DataType.build("INT"), "name": exp.DataType.build("VARCHAR")},
target_schema={"id": exp.DataType.build("BIGINT"), "name": exp.DataType.build("VARCHAR")},
added={},
removed={},
modified={"id": (exp.DataType.build("INT"), exp.DataType.build("BIGINT"))},
)
# Automatically converts to strings
assert schema_diff.source_schema["id"] == "INT"
assert schema_diff.modified["id"] == "INT → BIGINT"
Test Result Reporting
from web.server import models
test_result = models.TestResult(
tests_run=10,
failures=[
models.TestErrorOrFailure(
name="test_model_a",
path="tests/test_model_a.yaml",
tb="AssertionError: Expected 5 rows, got 3",
)
],
errors=[],
skipped=[
models.TestSkipped(
name="test_model_b",
path="tests/test_model_b.yaml",
reason="Missing test data",
)
],
successes=[
models.TestCase(name=f"test_{i}", path=f"tests/test_{i}.yaml")
for i in range(8)
],
)
# Serialize for API response
result_dict = test_result.dict()
Constants
SUPPORTED_EXTENSIONS = {".py", ".sql", ".yaml", ".yml", ".csv"}