Implementation:TobikoData Sqlmesh ModelConfig To Sqlmesh
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Dbt_Migration |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for converting dbt model configurations into native SQLMesh model objects with complete feature mapping provided by SQLMesh.
Description
The ModelConfig.to_sqlmesh method performs the comprehensive transformation of a dbt model configuration into a fully functional SQLMesh model. This method handles the complexities of mapping dbt's materialization strategies to SQLMesh model kinds, converting partition and clustering configurations, translating physical properties, and integrating audit definitions. It ensures that the resulting SQLMesh model preserves the semantics of the original dbt model while adapting to SQLMesh's execution framework.
The conversion process is database-aware, handling engine-specific features like BigQuery partition expiration, Snowflake dynamic tables, and ClickHouse table engines. It validates configurations against the target dialect, warns about unsupported features, and raises errors for invalid settings.
Usage
Use ModelConfig.to_sqlmesh when converting individual dbt models to SQLMesh format during project loading. This method is typically called by DbtLoader for each discovered model, but can also be used directly for custom conversion workflows or testing. The resulting SQLMesh model can be executed through SQLMesh's plan/apply system.
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/dbt/model.py
Signature
class ModelConfig:
def to_sqlmesh(
self,
context: DbtContext,
audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
) -> Model:
...
Import
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.dbt.context import DbtContext
from sqlmesh.core.model import Model
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| self | ModelConfig | Yes | The dbt model configuration to convert |
| context | DbtContext | Yes | DbtContext with project configuration, variables, and macro registry |
| audit_definitions | Dict[str, ModelAudit] | No | Audit definitions to attach to model (converted from dbt tests) |
| virtual_environment_mode | VirtualEnvironmentMode | No | Virtual environment mode configuration; affects model execution |
Outputs
| Name | Type | Description |
|---|---|---|
| model | Model | Fully converted SQLMesh model ready for execution |
Usage Examples
Basic Model Conversion
from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
# Create DbtContext
context = DbtContext(
project_root=Path("/dbt/project"),
profile_name="my_profile",
target_name="dev"
)
context.project_name = "my_project"
# Example dbt model configuration
model_config = ModelConfig(
name="customers",
sql="SELECT * FROM {{ ref('stg_customers') }}",
model_materialization="table",
path=Path("/dbt/project/models/customers.sql"),
alias=None,
schema="analytics"
)
# Convert to SQLMesh model
sqlmesh_model = model_config.to_sqlmesh(context)
print(f"Model name: {sqlmesh_model.name}")
print(f"Model kind: {sqlmesh_model.kind.name}")
print(f"Model dialect: {sqlmesh_model.dialect}")
Incremental Model Conversion
from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig, Materialization
context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"
# dbt incremental model with merge strategy
model_config = ModelConfig(
name="orders_incremental",
sql="""
SELECT
order_id,
customer_id,
order_date,
total_amount
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
""",
model_materialization=Materialization.INCREMENTAL,
incremental_strategy="merge",
unique_key=["order_id"],
partition_by=["DATE(order_date)"],
path=Path("/dbt/project/models/orders_incremental.sql")
)
# Convert to SQLMesh incremental model
sqlmesh_model = model_config.to_sqlmesh(context)
print(f"Model kind: {sqlmesh_model.kind.name}") # INCREMENTAL_BY_TIME_RANGE
print(f"Partitioned by: {sqlmesh_model.partitioned_by}")
print(f"Time column: {sqlmesh_model.time_column}")
Model with Audits
from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.core.audit import ModelAudit
context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"
# Define audits (converted from dbt tests)
audits = {
"assert_customer_id_not_null": ModelAudit(
name="assert_customer_id_not_null",
query="SELECT * FROM @this_model WHERE customer_id IS NULL"
),
"assert_customer_id_unique": ModelAudit(
name="assert_customer_id_unique",
query="SELECT customer_id, COUNT(*) as cnt FROM @this_model GROUP BY customer_id HAVING cnt > 1"
)
}
# Model configuration
model_config = ModelConfig(
name="customers",
sql="SELECT * FROM {{ ref('stg_customers') }}",
model_materialization="table",
path=Path("/dbt/project/models/customers.sql")
)
# Convert with audits
sqlmesh_model = model_config.to_sqlmesh(
context,
audit_definitions=audits
)
print(f"Model audits: {sqlmesh_model.audits}")
BigQuery-Specific Features
from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.dbt.target import TargetConfig
# BigQuery target
target = TargetConfig(
name="prod",
dialect="bigquery",
type="bigquery",
project="my-gcp-project",
dataset="analytics"
)
context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"
context.target = target
# Model with BigQuery-specific partition config
model_config = ModelConfig(
name="events_partitioned",
sql="SELECT * FROM {{ source('raw', 'events') }}",
model_materialization="table",
partition_by={
"field": "event_timestamp",
"data_type": "timestamp",
"granularity": "day"
},
partition_expiration_days=90,
require_partition_filter=True,
cluster_by=["user_id", "event_type"],
path=Path("/dbt/project/models/events.sql")
)
# Convert with BigQuery features
sqlmesh_model = model_config.to_sqlmesh(context)
print(f"Partitioned by: {sqlmesh_model.partitioned_by}")
print(f"Clustered by: {sqlmesh_model.clustered_by}")
print(f"Physical properties: {sqlmesh_model.physical_properties}")
# Output includes partition_expiration_days and require_partition_filter
Snowflake Dynamic Table
from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig, Materialization
from sqlmesh.dbt.target import TargetConfig
# Snowflake target
target = TargetConfig(
name="prod",
dialect="snowflake",
type="snowflake",
account="my_account",
warehouse="compute_wh",
database="analytics",
schema="public"
)
context = DbtContext(project_root=Path("/dbt/project"))
context.target = target
# Snowflake dynamic table configuration
model_config = ModelConfig(
name="real_time_metrics",
sql="SELECT * FROM {{ ref('events') }}",
model_materialization=Materialization.DYNAMIC_TABLE,
snowflake_warehouse="compute_wh",
target_lag="1 minute",
path=Path("/dbt/project/models/metrics.sql")
)
# Convert to SQLMesh model with Snowflake dynamic table properties
sqlmesh_model = model_config.to_sqlmesh(context)
print(f"Physical properties: {sqlmesh_model.physical_properties}")
# Includes warehouse and target_lag for dynamic table
Error Handling
from pathlib import Path
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.model import ModelConfig
from sqlmesh.core.config import ConfigError
context = DbtContext(project_root=Path("/dbt/project"))
context.project_name = "analytics"
# Invalid configuration
model_config = ModelConfig(
name="invalid_model",
sql="SELECT * FROM source_table",
model_materialization="view",
partition_by=["date_col"], # Views cannot be partitioned
path=Path("/dbt/project/models/invalid.sql")
)
try:
sqlmesh_model = model_config.to_sqlmesh(context)
except ConfigError as e:
print(f"Configuration error: {e}")
# Error explains why partition_by is invalid for views
# Model with warnings (logged but not fatal)
model_config = ModelConfig(
name="warning_model",
sql="SELECT * FROM source_table",
model_materialization="table",
query_settings={"some_setting": "value"}, # Unsupported for some engines
path=Path("/dbt/project/models/warning.sql")
)
sqlmesh_model = model_config.to_sqlmesh(context)
# Conversion succeeds, but warnings are logged about unsupported features