Implementation:TobikoData Sqlmesh Context Apply
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Deployment |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete plan application method for executing deployment plans by pushing snapshots, backfilling data, and promoting views, provided by the Context class.
Description
The Context.apply method is SQLMesh's implementation of the "apply" phase in plan/apply workflows. Given a validated Plan object, it orchestrates the complete deployment: persisting snapshot metadata to the state backend, executing backfill operations to materialize data for the specified time ranges, and promoting views to make changes visible in the target environment.
The method handles the full lifecycle of plan execution: validation that the plan is applicable, notification of apply start events, coordination of backfill operations respecting model dependencies, monitoring for circuit breaker signals that abort execution, promotion of snapshots to update the virtual layer, state synchronization to record completion, and notification of success or failure. If the plan requires no action (no changes, no backfills needed), the method returns immediately.
The apply operation is designed to be idempotent and resumable—interrupted applications can be restarted and will skip already-completed work. The method supports circuit breakers for safe abort of long-running operations and provides comprehensive error handling with automatic rollback on failure.
Usage
Call this method after reviewing and approving a plan created by context.plan(). Use it in production deployments after careful plan review, in staging/dev environments for automated deployments, and in CI/CD pipelines with circuit breakers for emergency stops. The method should only be invoked on plans that have been validated and contain no uncategorized changes. For high-risk deployments, provide a circuit_breaker callable that can monitor system health and abort if issues arise.
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/core/context.py
- Class: Context
- Method: apply (lines 1751-1803)
Signature
def apply(
self,
plan: Plan,
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
) -> None:
Import
from sqlmesh.core.context import Context
context = Context(paths="project")
plan = context.plan("prod")
context.apply(plan)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| plan | Plan | Yes | The plan object to apply, created by context.plan() |
| circuit_breaker | Callable[[], bool] | No | Optional function returning True to abort execution |
Outputs
| Name | Type | Description |
|---|---|---|
| None | None | Method returns None; side effects are state changes and database operations |
Usage Examples
Basic Usage
from sqlmesh.core.context import Context
context = Context(paths="examples/sushi")
# Create and apply plan
plan = context.plan("prod")
context.apply(plan)
print(f"Applied plan to {plan.environment_naming_info.name}")
Production Deployment
from sqlmesh.core.context import Context
context = Context(paths="project")
# Create plan with careful review
plan = context.plan(
environment="prod",
forward_only=True,
no_gaps=True
)
# Review plan details
print(f"Plan changes: {len(plan.context_diff.modified_snapshots)} models")
print(f"Backfill intervals: {sum(len(s.intervals) for s in plan.snapshots)}")
# Apply after approval
approval = input("Apply plan? (yes/no): ")
if approval.lower() == "yes":
context.apply(plan)
print("Deployment successful")
else:
print("Deployment cancelled")
Circuit Breaker Protection
from sqlmesh.core.context import Context
import time
context = Context(paths="project")
plan = context.plan("prod")
# Define circuit breaker for emergency stop
abort_flag = False
def should_abort():
# Check external signal (file, API, metric)
global abort_flag
return abort_flag
# Start apply with circuit breaker
try:
context.apply(plan, circuit_breaker=should_abort)
except Exception as e:
print(f"Apply failed or aborted: {e}")
Error Handling
from sqlmesh.core.context import Context
from sqlmesh.utils.errors import UncategorizedPlanError
context = Context(paths="project")
try:
plan = context.plan("prod", no_auto_categorization=True)
context.apply(plan)
except UncategorizedPlanError:
print("Plan has uncategorized changes, review and categorize first")
except Exception as e:
print(f"Apply failed: {e}")
# Plan state is preserved, can retry after fixing issues
Automated CI/CD
from sqlmesh.core.context import Context
import sys
context = Context(paths="/workspace/project")
# Create plan with auto-apply
plan = context.plan(
environment="staging",
skip_tests=False,
no_prompts=True,
auto_apply=True # Applies automatically if this is True
)
# If not auto-applied, apply manually
if not plan.context_diff.has_changes:
print("No changes to apply")
sys.exit(0)
try:
context.apply(plan)
print(f"Successfully deployed to {plan.environment_naming_info.name}")
sys.exit(0)
except Exception as e:
print(f"Deployment failed: {e}")
sys.exit(1)
Resumable Apply
from sqlmesh.core.context import Context
context = Context(paths="project")
plan = context.plan("prod")
# First attempt
try:
context.apply(plan)
except Exception as e:
print(f"Apply interrupted: {e}")
print("Progress is saved, can retry")
# Retry (will skip already-completed work)
try:
# Create new plan (will detect what's already done)
retry_plan = context.plan("prod")
context.apply(retry_plan)
print("Apply completed successfully")
except Exception as e:
print(f"Apply failed again: {e}")