Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TobikoData Sqlmesh ModelConfig To Sqlmesh

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Dbt_Migration
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for converting dbt model configurations into native SQLMesh model objects with complete feature mapping provided by SQLMesh.

Description

The ModelConfig.to_sqlmesh method performs the comprehensive transformation of a dbt model configuration into a fully functional SQLMesh model. This method handles the complexities of mapping dbt's materialization strategies to SQLMesh model kinds, converting partition and clustering configurations, translating physical properties, and integrating audit definitions. It ensures that the resulting SQLMesh model preserves the semantics of the original dbt model while adapting to SQLMesh's execution framework.

The conversion process is database-aware, handling engine-specific features like BigQuery partition expiration, Snowflake dynamic tables, and ClickHouse table engines. It validates configurations against the target dialect, warns about unsupported features, and raises errors for invalid settings.

Usage

Use ModelConfig.to_sqlmesh when converting individual dbt models to SQLMesh format during project loading. This method is typically called by DbtLoader for each discovered model, but can also be used directly for custom conversion workflows or testing. The resulting SQLMesh model can be executed through SQLMesh's plan/apply system.

Code Reference

Source Location

  • Repository: sqlmesh
  • File: sqlmesh/dbt/model.py

Signature

class ModelConfig:
    def to_sqlmesh(
        self,
        context: DbtContext,
        audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
        virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
    ) -> Model:
        ...

Import

from sqlmesh.dbt.model import ModelConfig
from sqlmesh.dbt.context import DbtContext
from sqlmesh.core.model import Model

I/O Contract

Inputs

Name Type Required Description
self ModelConfig Yes The dbt model configuration to convert
context DbtContext Yes DbtContext with project configuration, variables, and macro registry
audit_definitions Dict[str, ModelAudit] No Audit definitions to attach to model (converted from dbt tests)
virtual_environment_mode VirtualEnvironmentMode No Virtual environment mode configuration; affects model execution

Outputs

Name Type Description
model Model Fully converted SQLMesh model ready for execution

Usage Examples

Basic Model Conversion

from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig

# Create DbtContext
context = DbtContext(
    project_root=Path("/dbt/project"),
    profile_name="my_profile",
    target_name="dev"
)
context.project_name = "my_project"

# Example dbt model configuration
model_config = ModelConfig(
    name="customers",
    sql="SELECT * FROM {{ ref('stg_customers') }}",
    model_materialization="table",
    path=Path("/dbt/project/models/customers.sql"),
    alias=None,
    schema="analytics"
)

# Convert to SQLMesh model
sqlmesh_model = model_config.to_sqlmesh(context)

print(f"Model name: {sqlmesh_model.name}")
print(f"Model kind: {sqlmesh_model.kind.name}")
print(f"Model dialect: {sqlmesh_model.dialect}")

Incremental Model Conversion

from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig, Materialization

context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"

# dbt incremental model with merge strategy
model_config = ModelConfig(
    name="orders_incremental",
    sql="""
        SELECT
            order_id,
            customer_id,
            order_date,
            total_amount
        FROM {{ source('raw', 'orders') }}
        {% if is_incremental() %}
        WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
        {% endif %}
    """,
    model_materialization=Materialization.INCREMENTAL,
    incremental_strategy="merge",
    unique_key=["order_id"],
    partition_by=["DATE(order_date)"],
    path=Path("/dbt/project/models/orders_incremental.sql")
)

# Convert to SQLMesh incremental model
sqlmesh_model = model_config.to_sqlmesh(context)

print(f"Model kind: {sqlmesh_model.kind.name}")  # INCREMENTAL_BY_TIME_RANGE
print(f"Partitioned by: {sqlmesh_model.partitioned_by}")
print(f"Time column: {sqlmesh_model.time_column}")

Model with Audits

from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.core.audit import ModelAudit

context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"

# Define audits (converted from dbt tests)
audits = {
    "assert_customer_id_not_null": ModelAudit(
        name="assert_customer_id_not_null",
        query="SELECT * FROM @this_model WHERE customer_id IS NULL"
    ),
    "assert_customer_id_unique": ModelAudit(
        name="assert_customer_id_unique",
        query="SELECT customer_id, COUNT(*) as cnt FROM @this_model GROUP BY customer_id HAVING cnt > 1"
    )
}

# Model configuration
model_config = ModelConfig(
    name="customers",
    sql="SELECT * FROM {{ ref('stg_customers') }}",
    model_materialization="table",
    path=Path("/dbt/project/models/customers.sql")
)

# Convert with audits
sqlmesh_model = model_config.to_sqlmesh(
    context,
    audit_definitions=audits
)

print(f"Model audits: {sqlmesh_model.audits}")

BigQuery-Specific Features

from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.dbt.target import TargetConfig

# BigQuery target
target = TargetConfig(
    name="prod",
    dialect="bigquery",
    type="bigquery",
    project="my-gcp-project",
    dataset="analytics"
)

context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"
context.target = target

# Model with BigQuery-specific partition config
model_config = ModelConfig(
    name="events_partitioned",
    sql="SELECT * FROM {{ source('raw', 'events') }}",
    model_materialization="table",
    partition_by={
        "field": "event_timestamp",
        "data_type": "timestamp",
        "granularity": "day"
    },
    partition_expiration_days=90,
    require_partition_filter=True,
    cluster_by=["user_id", "event_type"],
    path=Path("/dbt/project/models/events.sql")
)

# Convert with BigQuery features
sqlmesh_model = model_config.to_sqlmesh(context)

print(f"Partitioned by: {sqlmesh_model.partitioned_by}")
print(f"Clustered by: {sqlmesh_model.clustered_by}")
print(f"Physical properties: {sqlmesh_model.physical_properties}")
# Output includes partition_expiration_days and require_partition_filter

Snowflake Dynamic Table

from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig, Materialization
from sqlmesh.dbt.target import TargetConfig

# Snowflake target
target = TargetConfig(
    name="prod",
    dialect="snowflake",
    type="snowflake",
    account="my_account",
    warehouse="compute_wh",
    database="analytics",
    schema="public"
)

context = DbtContext(project_root=Path("/dbt/project"))
context.target = target

# Snowflake dynamic table configuration
model_config = ModelConfig(
    name="real_time_metrics",
    sql="SELECT * FROM {{ ref('events') }}",
    model_materialization=Materialization.DYNAMIC_TABLE,
    snowflake_warehouse="compute_wh",
    target_lag="1 minute",
    path=Path("/dbt/project/models/metrics.sql")
)

# Convert to SQLMesh model with Snowflake dynamic table properties
sqlmesh_model = model_config.to_sqlmesh(context)

print(f"Physical properties: {sqlmesh_model.physical_properties}")
# Includes warehouse and target_lag for dynamic table

Error Handling

from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.core.config import ConfigError

context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"

# Invalid configuration
model_config = ModelConfig(
    name="invalid_model",
    sql="SELECT * FROM source_table",
    model_materialization="view",
    partition_by=["date_col"],  # Views cannot be partitioned
    path=Path("/dbt/project/models/invalid.sql")
)

try:
    sqlmesh_model = model_config.to_sqlmesh(context)
except ConfigError as e:
    print(f"Configuration error: {e}")
    # Error explains why partition_by is invalid for views

# Model with warnings (logged but not fatal)
model_config = ModelConfig(
    name="warning_model",
    sql="SELECT * FROM source_table",
    model_materialization="table",
    query_settings={"some_setting": "value"},  # Unsupported for some engines
    path=Path("/dbt/project/models/warning.sql")
)

sqlmesh_model = model_config.to_sqlmesh(context)
# Conversion succeeds, but warnings are logged about unsupported features

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment