Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:TobikoData Sqlmesh Cross Environment Data Comparison

From Leeroopedia
Revision as of 17:44, 16 February 2026 by Admin (talk | contribs) (Auto-imported from principles/TobikoData_Sqlmesh_Cross_Environment_Data_Comparison.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Engineering, Environment_Management
Last Updated 2026-02-07 00:00 GMT

Overview

Comparing data between environments at both schema and row levels to validate model changes before promotion.

Description

Cross-environment data comparison is a validation mechanism that identifies differences between the same logical models across different environments. This operates at multiple levels: schema structure (column names, types, ordering), aggregate statistics (row counts, null counts, distinct values), and row-level data (specific rows that differ). The comparison accounts for the grain (primary key) of each model to provide meaningful row-level diffs.

This solves a critical problem in data transformation development: ensuring that model changes produce expected results before deploying to production. Traditional approaches require manually querying tables and comparing results, which is error-prone and doesn't scale. Automated comparison enables:

  • Catching unintended data changes before they affect downstream consumers
  • Validating that business logic changes produce expected results
  • Identifying schema drift or breaking changes
  • Building confidence in changes through concrete evidence
  • Supporting test-driven development workflows for data transformations

The comparison is bidirectional and can identify rows that exist only in source, only in target, or that differ in their column values, providing complete visibility into data divergence.

Usage

Use cross-environment data comparison when:

  • Validating changes in a development or PR environment before merging to production
  • Regression testing to ensure refactoring doesn't change results
  • Debugging unexpected behavior by comparing working and broken environments
  • Verifying forward-only changes produce identical results in preview mode
  • Conducting exploratory analysis to understand impact of model modifications
  • Building automated data quality checks in CI/CD pipelines
  • Documenting expected changes as part of code review process
  • Troubleshooting discrepancies reported by downstream data consumers
  • Comparing staging environment to production to verify deployment correctness

Theoretical Basis

The cross-environment comparison algorithm operates as follows:

Table Resolution:

FUNCTION resolve_tables_for_comparison(source_env, target_env, model_selection):
    IF model_selection provided:
        # Environment-to-environment comparison for specific models
        source_snapshots = GET snapshots for model_selection IN source_env
        target_snapshots = GET snapshots for model_selection IN target_env

        table_pairs = []
        FOR each model IN model_selection:
            source_table = source_snapshots[model].physical_table
            target_table = target_snapshots[model].physical_table
            ADD (source_table, target_table, model) to table_pairs
    ELSE:
        # Direct table-to-table comparison
        table_pairs = [(source_env, target_env, "direct")]

    RETURN table_pairs

Schema Comparison:

FUNCTION compare_schemas(source_table, target_table):
    source_schema = GET column definitions FROM source_table
    target_schema = GET column definitions FROM target_table

    differences = {
        added_columns: [],
        removed_columns: [],
        type_changes: []
    }

    FOR each column IN source_schema:
        IF column NOT IN target_schema:
            ADD column to differences.removed_columns
        ELSE IF source_schema[column].type != target_schema[column].type:
            ADD (column, source_type, target_type) to differences.type_changes

    FOR each column IN target_schema:
        IF column NOT IN source_schema:
            ADD column to differences.added_columns

    RETURN differences

Row-Level Comparison:

FUNCTION compare_rows(source_table, target_table, grain_columns, skip_columns, decimals):
    # Standardize column names and types
    common_columns = INTERSECT(
        source_table.columns,
        target_table.columns
    ) MINUS skip_columns

    # Round floating point columns to specified decimals
    FOR each column IN common_columns:
        IF column.type IS FLOAT:
            ROUND(column, decimals)

    # Compute row-level differences using grain
    IF grain_columns NOT provided:
        grain_columns = INFER from model metadata

    diff_query = """
        WITH source_data AS (
            SELECT {grain_columns}, {common_columns}
            FROM {source_table}
        ),
        target_data AS (
            SELECT {grain_columns}, {common_columns}
            FROM {target_table}
        ),
        joined AS (
            SELECT
                s.*,
                t.*,
                CASE
                    WHEN s.{grain} IS NULL THEN 'target_only'
                    WHEN t.{grain} IS NULL THEN 'source_only'
                    ELSE 'both'
                END as presence
            FROM source_data s
            FULL OUTER JOIN target_data t
                ON {grain_join_condition}
        )
        SELECT *
        FROM joined
        WHERE presence != 'both'
           OR {any_column_differs}
        LIMIT {limit}
    """

    differences = EXECUTE diff_query

    RETURN differences

Aggregate Statistics:

FUNCTION compute_statistics(source_table, target_table, columns):
    stats = {}

    FOR each column IN columns:
        source_stats = QUERY:
            COUNT(*) as row_count,
            COUNT(column) as non_null_count,
            COUNT(DISTINCT column) as distinct_count,
            MIN(column) as min_value,
            MAX(column) as max_value
        FROM source_table

        target_stats = QUERY: [same as above] FROM target_table

        stats[column] = {
            source: source_stats,
            target: target_stats,
            row_count_diff: target_stats.row_count - source_stats.row_count,
            null_count_diff: (target_stats.row_count - target_stats.non_null_count) -
                           (source_stats.row_count - source_stats.non_null_count)
        }

    RETURN stats

Complete Diff Result:

FUNCTION table_diff(source, target, options):
    schema_diff = compare_schemas(source, target)

    IF schema_diff has breaking changes:
        RETURN {schema_diff: schema_diff, row_diff: null}

    row_diff = compare_rows(
        source,
        target,
        options.grain,
        options.skip_columns,
        options.decimals
    )

    statistics = compute_statistics(source, target, common_columns)

    RETURN {
        schema_diff: schema_diff,
        row_diff: row_diff,
        statistics: statistics,
        is_equal: schema_diff.is_empty AND row_diff.is_empty
    }

The comparison is optimized to minimize data scanning through limit clauses and can operate on very large tables by focusing on grain-based joins and aggregate statistics rather than full table scans.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment