Principle:TobikoData Sqlmesh Cross Environment Data Comparison
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Environment_Management |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Comparing data between environments at both schema and row levels to validate model changes before promotion.
Description
Cross-environment data comparison is a validation mechanism that identifies differences between the same logical models across different environments. This operates at multiple levels: schema structure (column names, types, ordering), aggregate statistics (row counts, null counts, distinct values), and row-level data (specific rows that differ). The comparison accounts for the grain (primary key) of each model to provide meaningful row-level diffs.
This solves a critical problem in data transformation development: ensuring that model changes produce expected results before deploying to production. Traditional approaches require manually querying tables and comparing results, which is error-prone and doesn't scale. Automated comparison enables:
- Catching unintended data changes before they affect downstream consumers
- Validating that business logic changes produce expected results
- Identifying schema drift or breaking changes
- Building confidence in changes through concrete evidence
- Supporting test-driven development workflows for data transformations
The comparison is bidirectional and can identify rows that exist only in source, only in target, or that differ in their column values, providing complete visibility into data divergence.
Usage
Use cross-environment data comparison when:
- Validating changes in a development or PR environment before merging to production
- Regression testing to ensure refactoring doesn't change results
- Debugging unexpected behavior by comparing working and broken environments
- Verifying forward-only changes produce identical results in preview mode
- Conducting exploratory analysis to understand impact of model modifications
- Building automated data quality checks in CI/CD pipelines
- Documenting expected changes as part of code review process
- Troubleshooting discrepancies reported by downstream data consumers
- Comparing staging environment to production to verify deployment correctness
Theoretical Basis
The cross-environment comparison algorithm operates as follows:
Table Resolution:
FUNCTION resolve_tables_for_comparison(source_env, target_env, model_selection):
IF model_selection provided:
# Environment-to-environment comparison for specific models
source_snapshots = GET snapshots for model_selection IN source_env
target_snapshots = GET snapshots for model_selection IN target_env
table_pairs = []
FOR each model IN model_selection:
source_table = source_snapshots[model].physical_table
target_table = target_snapshots[model].physical_table
ADD (source_table, target_table, model) to table_pairs
ELSE:
# Direct table-to-table comparison
table_pairs = [(source_env, target_env, "direct")]
RETURN table_pairs
Schema Comparison:
FUNCTION compare_schemas(source_table, target_table):
source_schema = GET column definitions FROM source_table
target_schema = GET column definitions FROM target_table
differences = {
added_columns: [],
removed_columns: [],
type_changes: []
}
FOR each column IN source_schema:
IF column NOT IN target_schema:
ADD column to differences.removed_columns
ELSE IF source_schema[column].type != target_schema[column].type:
ADD (column, source_type, target_type) to differences.type_changes
FOR each column IN target_schema:
IF column NOT IN source_schema:
ADD column to differences.added_columns
RETURN differences
Row-Level Comparison:
FUNCTION compare_rows(source_table, target_table, grain_columns, skip_columns, decimals):
# Standardize column names and types
common_columns = INTERSECT(
source_table.columns,
target_table.columns
) MINUS skip_columns
# Round floating point columns to specified decimals
FOR each column IN common_columns:
IF column.type IS FLOAT:
ROUND(column, decimals)
# Compute row-level differences using grain
IF grain_columns NOT provided:
grain_columns = INFER from model metadata
diff_query = """
WITH source_data AS (
SELECT {grain_columns}, {common_columns}
FROM {source_table}
),
target_data AS (
SELECT {grain_columns}, {common_columns}
FROM {target_table}
),
joined AS (
SELECT
s.*,
t.*,
CASE
WHEN s.{grain} IS NULL THEN 'target_only'
WHEN t.{grain} IS NULL THEN 'source_only'
ELSE 'both'
END as presence
FROM source_data s
FULL OUTER JOIN target_data t
ON {grain_join_condition}
)
SELECT *
FROM joined
WHERE presence != 'both'
OR {any_column_differs}
LIMIT {limit}
"""
differences = EXECUTE diff_query
RETURN differences
Aggregate Statistics:
FUNCTION compute_statistics(source_table, target_table, columns):
stats = {}
FOR each column IN columns:
source_stats = QUERY:
COUNT(*) as row_count,
COUNT(column) as non_null_count,
COUNT(DISTINCT column) as distinct_count,
MIN(column) as min_value,
MAX(column) as max_value
FROM source_table
target_stats = QUERY: [same as above] FROM target_table
stats[column] = {
source: source_stats,
target: target_stats,
row_count_diff: target_stats.row_count - source_stats.row_count,
null_count_diff: (target_stats.row_count - target_stats.non_null_count) -
(source_stats.row_count - source_stats.non_null_count)
}
RETURN stats
Complete Diff Result:
FUNCTION table_diff(source, target, options):
schema_diff = compare_schemas(source, target)
IF schema_diff has breaking changes:
RETURN {schema_diff: schema_diff, row_diff: null}
row_diff = compare_rows(
source,
target,
options.grain,
options.skip_columns,
options.decimals
)
statistics = compute_statistics(source, target, common_columns)
RETURN {
schema_diff: schema_diff,
row_diff: row_diff,
statistics: statistics,
is_equal: schema_diff.is_empty AND row_diff.is_empty
}
The comparison is optimized to minimize data scanning through limit clauses and can operate on very large tables by focusing on grain-based joins and aggregate statistics rather than full table scans.