Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:TobikoData Sqlmesh PlanBuilder Forward Only

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Incremental_Processing
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for constructing execution plans that apply incremental model changes only to future intervals provided by SQLMesh.

Description

PlanBuilder is the core orchestration class responsible for constructing deployment plans that determine which models to process, which intervals to execute, and how to handle schema and logic changes. When configured with forward_only mode, it enforces policies that prevent historical data reprocessing while allowing evolution of data pipeline logic for future intervals.

The builder analyzes changes between the current environment state and proposed model definitions, classifies changes as breaking (destructive), additive, or non-breaking, and validates that forward-only constraints are satisfied. It uses the allow_destructive_models and allow_additive_models parameters to provide explicit control over which models can undergo schema changes.

The effective_from parameter establishes a temporal boundary, ensuring that intervals before this date continue using previous model versions while new logic applies only to subsequent intervals. This enables gradual migration strategies and supports scenarios with strict historical data preservation requirements.

Usage

Use PlanBuilder with forward_only=True when deploying incremental model changes to production environments where historical data must remain unchanged. Configure allow lists to explicitly permit specific models to have schema changes when necessary. Set effective_from to control the exact date when new logic begins applying.

Code Reference

Source Location

  • Repository: sqlmesh
  • File: sqlmesh/core/plan/builder.py:L55-137

Signature

class PlanBuilder:
    def __init__(
        self,
        context_diff: ContextDiff,
        start: t.Optional[TimeLike] = None,
        end: t.Optional[TimeLike] = None,
        execution_time: t.Optional[TimeLike] = None,
        apply: t.Optional[t.Callable[[Plan], None]] = None,
        restate_models: t.Optional[t.Iterable[str]] = None,
        restate_all_snapshots: bool = False,
        backfill_models: t.Optional[t.Iterable[str]] = None,
        no_gaps: bool = False,
        skip_backfill: bool = False,
        empty_backfill: bool = False,
        is_dev: bool = False,
        forward_only: bool = False,
        allow_destructive_models: t.Optional[t.Iterable[str]] = None,
        allow_additive_models: t.Optional[t.Iterable[str]] = None,
        environment_ttl: t.Optional[str] = None,
        environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default,
        environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
        categorizer_config: t.Optional[CategorizerConfig] = None,
        auto_categorization_enabled: bool = True,
        effective_from: t.Optional[TimeLike] = None,
        include_unmodified: bool = False,
        default_start: t.Optional[TimeLike] = None,
        default_end: t.Optional[TimeLike] = None,
        enable_preview: bool = False,
        end_bounded: bool = False,
        ensure_finalized_snapshots: bool = False,
        explain: bool = False,
        ignore_cron: bool = False,
        start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
        end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
        console: t.Optional[PlanBuilderConsole] = None,
        user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
        selected_models: t.Optional[t.Set[str]] = None,
    ):
        ...

Import

from sqlmesh.core.plan.builder import PlanBuilder
from sqlmesh.core.context_diff import ContextDiff

I/O Contract

Inputs

Name Type Required Description
context_diff ContextDiff Yes Difference between current and proposed model states
forward_only bool No Enable forward-only change mode (default: False)
allow_destructive_models Iterable[str] No Model names permitted to have destructive schema changes
allow_additive_models Iterable[str] No Model names permitted to have additive schema changes
effective_from TimeLike No Date when forward-only changes begin applying
start TimeLike No Plan start date for interval processing
end TimeLike No Plan end date for interval processing
execution_time TimeLike No Reference time for plan execution (defaults to now)
is_dev bool No Whether this is a development environment plan (default: False)
no_gaps bool No Ensure no data gaps in processed intervals (default: False)

Outputs

Name Type Description
plan_builder PlanBuilder Configured builder ready to generate execution plans
build() Plan Generated plan with validated forward-only constraints

Usage Examples

Basic Forward-Only Plan

from sqlmesh import Context

context = Context()

# Deploy changes that only affect future data
plan = context.plan(
    environment='prod',
    forward_only=True,
    effective_from='2024-02-01'
)

# Historical data before 2024-02-01 remains unchanged
# New logic applies to data from 2024-02-01 onward

Allow Specific Models to Have Schema Changes

from sqlmesh import Context

context = Context()

# Allow specific model to add new columns in forward-only mode
plan = context.plan(
    environment='prod',
    forward_only=True,
    allow_additive_models=[
        'my_schema.user_events',
        'my_schema.transaction_log'
    ],
    effective_from='2024-02-01'
)

# These models can add columns going forward
# Other models are restricted to non-breaking changes only

Controlled Destructive Changes

from sqlmesh import Context

context = Context()

# Explicitly allow destructive change for specific model
# Use with caution - this can break downstream consumers
plan = context.plan(
    environment='prod',
    forward_only=True,
    allow_destructive_models=[
        'my_schema.experimental_features'
    ],
    effective_from='2024-02-01',
    no_prompts=False  # Require manual confirmation
)

# Only 'experimental_features' can have breaking schema changes
# All other models must be non-breaking

Validate Forward-Only Plan Without Applying

from sqlmesh import Context

context = Context()

# Preview forward-only changes without applying
plan = context.plan(
    environment='prod',
    forward_only=True,
    effective_from='2024-02-01',
    skip_backfill=True,  # Don't process intervals yet
    explain=True  # Show detailed explanation
)

# Review plan details
print(f"Forward-only mode: {plan.is_forward_only}")
print(f"Effective from: {plan.effective_from}")
print(f"Models with changes: {len(plan.context_diff.modified_snapshots)}")

# Check for breaking changes
for snapshot in plan.context_diff.modified_snapshots.values():
    if snapshot.change_category.is_breaking:
        print(f"Breaking change detected in: {snapshot.name}")

Production Deployment Workflow

from sqlmesh import Context
from datetime import datetime, timedelta

context = Context()

# Step 1: Test in dev environment first
dev_plan = context.plan(
    environment='dev',
    start='2024-01-15',
    end='2024-01-31',
    ignore_cron=True
)

# Step 2: Apply to prod with forward-only mode
prod_plan = context.plan(
    environment='prod',
    forward_only=True,
    effective_from=datetime.now().strftime('%Y-%m-%d'),
    no_prompts=False,  # Require confirmation for prod
    auto_apply=False   # Manual review before applying
)

# Review and manually apply
if prod_plan.requires_backfill:
    print(f"Backfill required for {len(prod_plan.missing_intervals)} intervals")
else:
    print("No backfill required - forward-only changes applied")

Gradual Rollout Strategy

from sqlmesh import Context
from datetime import datetime, timedelta

context = Context()

# Week 1: Apply changes effective from next week
week_1_date = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d')

plan_week1 = context.plan(
    environment='prod',
    forward_only=True,
    effective_from=week_1_date,
    auto_apply=True
)

# Monitor for issues during Week 1

# Week 2: If successful, optionally reprocess history
# (requires separate plan without forward_only)
if input("Reprocess historical data? (y/n): ") == 'y':
    plan_backfill = context.plan(
        environment='prod',
        start='2024-01-01',
        end=week_1_date,
        forward_only=False  # Allow historical reprocessing
    )

CI/CD with Forward-Only Protection

from sqlmesh import Context
import sys

def deploy_to_production():
    """
    Production deployment with forward-only safety checks.
    """
    context = Context()

    try:
        plan = context.plan(
            environment='prod',
            forward_only=True,
            effective_from='today',
            no_prompts=True,  # Non-interactive for CI/CD
            auto_apply=False  # Return plan for validation
        )

        # Validation checks
        has_breaking_changes = any(
            snapshot.change_category.is_breaking
            for snapshot in plan.context_diff.modified_snapshots.values()
        )

        if has_breaking_changes:
            print("ERROR: Breaking changes detected in forward-only mode")
            print("Add models to allow_destructive_models if intentional")
            return 1

        # Apply plan
        context.apply(plan)
        print("Successfully deployed forward-only changes")
        return 0

    except Exception as e:
        print(f"Deployment failed: {e}")
        return 1

if __name__ == '__main__':
    sys.exit(deploy_to_production())

Programmatic PlanBuilder Usage

from sqlmesh import Context
from sqlmesh.core.plan.builder import PlanBuilder
from datetime import datetime

context = Context()

# Get context diff
context_diff = context.diff(environment='prod')

# Manually construct PlanBuilder with forward-only settings
plan_builder = PlanBuilder(
    context_diff=context_diff,
    forward_only=True,
    effective_from=datetime(2024, 2, 1),
    allow_additive_models={'my_schema.events'},
    allow_destructive_models=set(),  # No destructive changes allowed
    start=datetime(2024, 1, 1),
    end=datetime(2024, 12, 31),
    is_dev=False,
    no_gaps=True
)

# Build and validate plan
plan = plan_builder.build()

# Inspect forward-only configuration
print(f"Forward-only: {plan_builder._forward_only}")
print(f"Effective from: {plan_builder._effective_from}")
print(f"Allowed additive: {plan_builder._allow_additive_models}")
print(f"Allowed destructive: {plan_builder._allow_destructive_models}")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment