Implementation:TobikoData Sqlmesh PlanBuilder Forward Only
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Incremental_Processing |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for constructing execution plans that apply incremental model changes only to future intervals provided by SQLMesh.
Description
PlanBuilder is the core orchestration class responsible for constructing deployment plans that determine which models to process, which intervals to execute, and how to handle schema and logic changes. When configured with forward_only mode, it enforces policies that prevent historical data reprocessing while allowing evolution of data pipeline logic for future intervals.
The builder analyzes changes between the current environment state and proposed model definitions, classifies changes as breaking (destructive), additive, or non-breaking, and validates that forward-only constraints are satisfied. It uses the allow_destructive_models and allow_additive_models parameters to provide explicit control over which models can undergo schema changes.
The effective_from parameter establishes a temporal boundary, ensuring that intervals before this date continue using previous model versions while new logic applies only to subsequent intervals. This enables gradual migration strategies and supports scenarios with strict historical data preservation requirements.
Usage
Use PlanBuilder with forward_only=True when deploying incremental model changes to production environments where historical data must remain unchanged. Configure allow lists to explicitly permit specific models to have schema changes when necessary. Set effective_from to control the exact date when new logic begins applying.
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/core/plan/builder.py:L55-137
Signature
class PlanBuilder:
def __init__(
self,
context_diff: ContextDiff,
start: t.Optional[TimeLike] = None,
end: t.Optional[TimeLike] = None,
execution_time: t.Optional[TimeLike] = None,
apply: t.Optional[t.Callable[[Plan], None]] = None,
restate_models: t.Optional[t.Iterable[str]] = None,
restate_all_snapshots: bool = False,
backfill_models: t.Optional[t.Iterable[str]] = None,
no_gaps: bool = False,
skip_backfill: bool = False,
empty_backfill: bool = False,
is_dev: bool = False,
forward_only: bool = False,
allow_destructive_models: t.Optional[t.Iterable[str]] = None,
allow_additive_models: t.Optional[t.Iterable[str]] = None,
environment_ttl: t.Optional[str] = None,
environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default,
environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
categorizer_config: t.Optional[CategorizerConfig] = None,
auto_categorization_enabled: bool = True,
effective_from: t.Optional[TimeLike] = None,
include_unmodified: bool = False,
default_start: t.Optional[TimeLike] = None,
default_end: t.Optional[TimeLike] = None,
enable_preview: bool = False,
end_bounded: bool = False,
ensure_finalized_snapshots: bool = False,
explain: bool = False,
ignore_cron: bool = False,
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
console: t.Optional[PlanBuilderConsole] = None,
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
selected_models: t.Optional[t.Set[str]] = None,
):
...
Import
from sqlmesh.core.plan.builder import PlanBuilder
from sqlmesh.core.context_diff import ContextDiff
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| context_diff | ContextDiff | Yes | Difference between current and proposed model states |
| forward_only | bool | No | Enable forward-only change mode (default: False) |
| allow_destructive_models | Iterable[str] | No | Model names permitted to have destructive schema changes |
| allow_additive_models | Iterable[str] | No | Model names permitted to have additive schema changes |
| effective_from | TimeLike | No | Date when forward-only changes begin applying |
| start | TimeLike | No | Plan start date for interval processing |
| end | TimeLike | No | Plan end date for interval processing |
| execution_time | TimeLike | No | Reference time for plan execution (defaults to now) |
| is_dev | bool | No | Whether this is a development environment plan (default: False) |
| no_gaps | bool | No | Ensure no data gaps in processed intervals (default: False) |
Outputs
| Name | Type | Description |
|---|---|---|
| plan_builder | PlanBuilder | Configured builder ready to generate execution plans |
| build() | Plan | Generated plan with validated forward-only constraints |
Usage Examples
Basic Forward-Only Plan
from sqlmesh import Context
context = Context()
# Deploy changes that only affect future data
plan = context.plan(
environment='prod',
forward_only=True,
effective_from='2024-02-01'
)
# Historical data before 2024-02-01 remains unchanged
# New logic applies to data from 2024-02-01 onward
Allow Specific Models to Have Schema Changes
from sqlmesh import Context
context = Context()
# Allow specific model to add new columns in forward-only mode
plan = context.plan(
environment='prod',
forward_only=True,
allow_additive_models=[
'my_schema.user_events',
'my_schema.transaction_log'
],
effective_from='2024-02-01'
)
# These models can add columns going forward
# Other models are restricted to non-breaking changes only
Controlled Destructive Changes
from sqlmesh import Context
context = Context()
# Explicitly allow destructive change for specific model
# Use with caution - this can break downstream consumers
plan = context.plan(
environment='prod',
forward_only=True,
allow_destructive_models=[
'my_schema.experimental_features'
],
effective_from='2024-02-01',
no_prompts=False # Require manual confirmation
)
# Only 'experimental_features' can have breaking schema changes
# All other models must be non-breaking
Validate Forward-Only Plan Without Applying
from sqlmesh import Context
context = Context()
# Preview forward-only changes without applying
plan = context.plan(
environment='prod',
forward_only=True,
effective_from='2024-02-01',
skip_backfill=True, # Don't process intervals yet
explain=True # Show detailed explanation
)
# Review plan details
print(f"Forward-only mode: {plan.is_forward_only}")
print(f"Effective from: {plan.effective_from}")
print(f"Models with changes: {len(plan.context_diff.modified_snapshots)}")
# Check for breaking changes
for snapshot in plan.context_diff.modified_snapshots.values():
if snapshot.change_category.is_breaking:
print(f"Breaking change detected in: {snapshot.name}")
Production Deployment Workflow
from sqlmesh import Context
from datetime import datetime, timedelta
context = Context()
# Step 1: Test in dev environment first
dev_plan = context.plan(
environment='dev',
start='2024-01-15',
end='2024-01-31',
ignore_cron=True
)
# Step 2: Apply to prod with forward-only mode
prod_plan = context.plan(
environment='prod',
forward_only=True,
effective_from=datetime.now().strftime('%Y-%m-%d'),
no_prompts=False, # Require confirmation for prod
auto_apply=False # Manual review before applying
)
# Review and manually apply
if prod_plan.requires_backfill:
print(f"Backfill required for {len(prod_plan.missing_intervals)} intervals")
else:
print("No backfill required - forward-only changes applied")
Gradual Rollout Strategy
from sqlmesh import Context
from datetime import datetime, timedelta
context = Context()
# Week 1: Apply changes effective from next week
week_1_date = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d')
plan_week1 = context.plan(
environment='prod',
forward_only=True,
effective_from=week_1_date,
auto_apply=True
)
# Monitor for issues during Week 1
# Week 2: If successful, optionally reprocess history
# (requires separate plan without forward_only)
if input("Reprocess historical data? (y/n): ") == 'y':
plan_backfill = context.plan(
environment='prod',
start='2024-01-01',
end=week_1_date,
forward_only=False # Allow historical reprocessing
)
CI/CD with Forward-Only Protection
from sqlmesh import Context
import sys
def deploy_to_production():
"""
Production deployment with forward-only safety checks.
"""
context = Context()
try:
plan = context.plan(
environment='prod',
forward_only=True,
effective_from='today',
no_prompts=True, # Non-interactive for CI/CD
auto_apply=False # Return plan for validation
)
# Validation checks
has_breaking_changes = any(
snapshot.change_category.is_breaking
for snapshot in plan.context_diff.modified_snapshots.values()
)
if has_breaking_changes:
print("ERROR: Breaking changes detected in forward-only mode")
print("Add models to allow_destructive_models if intentional")
return 1
# Apply plan
context.apply(plan)
print("Successfully deployed forward-only changes")
return 0
except Exception as e:
print(f"Deployment failed: {e}")
return 1
if __name__ == '__main__':
sys.exit(deploy_to_production())
Programmatic PlanBuilder Usage
from sqlmesh import Context
from sqlmesh.core.plan.builder import PlanBuilder
from datetime import datetime
context = Context()
# Get context diff
context_diff = context.diff(environment='prod')
# Manually construct PlanBuilder with forward-only settings
plan_builder = PlanBuilder(
context_diff=context_diff,
forward_only=True,
effective_from=datetime(2024, 2, 1),
allow_additive_models={'my_schema.events'},
allow_destructive_models=set(), # No destructive changes allowed
start=datetime(2024, 1, 1),
end=datetime(2024, 12, 31),
is_dev=False,
no_gaps=True
)
# Build and validate plan
plan = plan_builder.build()
# Inspect forward-only configuration
print(f"Forward-only: {plan_builder._forward_only}")
print(f"Effective from: {plan_builder._effective_from}")
print(f"Allowed additive: {plan_builder._allow_additive_models}")
print(f"Allowed destructive: {plan_builder._allow_destructive_models}")