Implementation:TobikoData Sqlmesh Context Plan Apply For Promotion
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Deployment, Model_Development |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete production deployment workflow combining plan generation and atomic application provided by SQLMesh.
Description
Production promotion in SQLMesh uses a two-step workflow: Context.plan (with environment=None or environment='prod') generates a deployment plan by comparing current code with production state, categorizing changes, and computing backfill requirements; Context.apply executes the plan by pushing new snapshots, backfilling data, and atomically updating environment pointers. This separation enables review, approval, and safe rollback of production deployments.
The plan phase identifies all changed models through fingerprint comparison, determines whether changes are breaking or non-breaking, computes minimal backfill intervals, calculates downstream impact, and generates a reviewable artifact. The apply phase pushes new snapshot versions to state storage, executes backfill jobs using the scheduler, atomically updates production environment pointers to new snapshots, and optionally cleans up unreferenced old snapshots. The atomic pointer update means production queries continue using old snapshots until new ones are validated and ready.
Usage
Use these methods together when promoting validated changes from development to production. Generate a plan first to review what will change, obtain necessary approvals (manual or automated), then apply the plan to execute the deployment. The plan can be generated in CI/CD and applied in a separate step after review. For automated deployments, set auto_apply=True in the plan method to combine both steps.
Code Reference
Source Location
- Repository: sqlmesh
- File:
sqlmesh/core/context.pyplan: Lines 1311-1443apply: Lines 1751-1800
Signature
# Method 1: plan
def plan(
self,
environment: t.Optional[str] = None,
*,
start: t.Optional[TimeLike] = None,
end: t.Optional[TimeLike] = None,
execution_time: t.Optional[TimeLike] = None,
create_from: t.Optional[str] = None,
skip_tests: t.Optional[bool] = None,
restate_models: t.Optional[t.Iterable[str]] = None,
no_gaps: t.Optional[bool] = None,
skip_backfill: t.Optional[bool] = None,
forward_only: t.Optional[bool] = None,
no_prompts: t.Optional[bool] = None,
auto_apply: t.Optional[bool] = None,
# ... (additional parameters)
) -> Plan
# Method 2: apply
def apply(
self,
plan: Plan,
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
) -> None
Import
from sqlmesh import Context
context = Context()
I/O Contract
Inputs
plan method (production mode):
| Name | Type | Required | Description |
|---|---|---|---|
| environment | str | No | Target environment (None or 'prod' for production) |
| start | TimeLike | No | Start date for backfill |
| end | TimeLike | No | End date for backfill |
| execution_time | TimeLike | No | Execution timestamp (default: now) |
| skip_tests | bool | No | Skip unit tests (default: False) |
| restate_models | Iterable[str] | No | Models to force recomputation |
| no_gaps | bool | No | Ensure no data gaps in production |
| forward_only | bool | No | Apply forward-only changes |
| no_prompts | bool | No | Disable interactive prompts (for automation) |
| auto_apply | bool | No | Automatically apply after plan creation |
apply method:
| Name | Type | Required | Description |
|---|---|---|---|
| plan | Plan | Yes | Plan object generated by the plan method |
| circuit_breaker | Callable | No | Function to abort apply if it returns False |
Outputs
| Method | Return Type | Description |
|---|---|---|
| plan | Plan | Plan object with snapshot changes and backfill requirements |
| apply | None | Executes deployment but returns nothing |
Usage Examples
Basic Production Deployment
from sqlmesh import Context
context = Context()
# Step 1: Generate production deployment plan
plan = context.plan(
environment="prod",
start="2024-01-01",
end="2024-01-31"
)
# Step 2: Review the plan (interactive)
# SQLMesh shows what will change, breaking vs non-breaking, backfill requirements
# Step 3: Apply the plan
context.apply(plan)
print("Production deployment completed!")
Automated CI/CD Deployment
# Deploy to production without prompts
context.plan(
environment="prod",
start="2024-01-01",
end="2024-01-31",
no_prompts=True, # No interactive questions
auto_apply=True, # Automatically apply after plan
skip_tests=False # Run tests before deploying
)
# Both plan and apply happen automatically
print("Automated production deployment completed")
Safe Deployment with Gap Prevention
# Ensure no data gaps when deploying
plan = context.plan(
environment="prod",
start="2024-01-01",
end="2024-01-31",
no_gaps=True, # Fail if gaps detected
skip_tests=False
)
# Review breaking changes before applying
if plan.context_diff.has_breaking_changes:
print("WARNING: Breaking changes detected")
response = input("Proceed with deployment? (yes/no): ")
if response.lower() != "yes":
print("Deployment cancelled")
exit(1)
context.apply(plan)
Forward-Only Production Changes
# Deploy changes that only affect future data
plan = context.plan(
environment="prod",
forward_only=True, # Only impact future intervals
effective_from="2024-02-01", # Changes effective from this date
skip_backfill=True # No historical backfill
)
context.apply(plan)
print("Forward-only changes deployed")
Deployment with Restated Models
# Force recomputation of specific models
plan = context.plan(
environment="prod",
start="2024-01-01",
end="2024-01-31",
restate_models=["my_schema.revenue_model"], # Force recompute
no_prompts=True
)
context.apply(plan)
# Revenue model and all downstream models recomputed
Deployment with Circuit Breaker
# Abort deployment if external condition fails
def check_maintenance_window():
"""Return False to abort deployment"""
from datetime import datetime
hour = datetime.now().hour
# Only deploy between 2 AM and 4 AM
return 2 <= hour < 4
plan = context.plan(
environment="prod",
start="2024-01-01",
end="2024-01-31",
no_prompts=True
)
# Apply with circuit breaker
context.apply(
plan,
circuit_breaker=check_maintenance_window
)
# Deployment aborts if circuit breaker returns False
Multi-Step Review Workflow
# Generate plan for review, apply later
# Step 1: Generate plan (CI job)
plan = context.plan(
environment="prod",
start="2024-01-01",
end="2024-01-31",
no_prompts=True,
auto_apply=False # Don't apply yet
)
# Step 2: Serialize plan for review
import json
plan_summary = {
"new_snapshots": len(plan.new_snapshots),
"modified_snapshots": len(plan.modified_snapshots),
"requires_backfill": plan.requires_backfill,
"has_breaking_changes": plan.context_diff.has_breaking_changes
}
print(json.dumps(plan_summary, indent=2))
# Step 3: After approval, apply in separate job
# (plan is stored in state sync, can be retrieved)
context.apply(plan)
Complete Deployment Pipeline
def deploy_to_production(start_date, end_date):
"""Complete deployment pipeline with validation"""
context = Context()
# 1. Run unit tests
test_results = context.test()
if not test_results.successful:
raise Exception("Unit tests failed")
# 2. Generate deployment plan
plan = context.plan(
environment="prod",
start=start_date,
end=end_date,
no_prompts=True,
auto_apply=False
)
# 3. Validate plan
if plan.requires_backfill:
print(f"Backfill required: {plan.backfill_intervals}")
# 4. Apply plan
context.apply(plan)
# 5. Run audits on deployed data
success = context.audit(
start=start_date,
end=end_date
)
if not success:
raise Exception("Post-deployment audits failed")
print("Production deployment completed and validated!")
return True
# Execute deployment
deploy_to_production("2024-01-01", "2024-01-31")