Implementation:TobikoData Sqlmesh Context Run
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Incremental_Processing |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for continuously processing new data intervals by running the DAG scheduler to evaluate all missing intervals provided by SQLMesh.
Description
Context.run() is the operational runtime method that executes incremental data pipelines in production. It evaluates the entire DAG of models, identifies missing intervals that need processing based on cron schedules and completion state, and executes transformations in dependency order. This is the primary method for maintaining up-to-date data pipelines after initial deployment.
The method integrates with environment finalization checks to ensure it doesn't interfere with concurrent plan applications. It waits for any in-progress environment updates to complete before processing intervals. The janitor service automatically runs (unless disabled) to clean up expired development environments and orphaned resources.
The implementation handles transient failures gracefully, maintaining completion state across runs so that interrupted pipelines can resume without reprocessing completed intervals. It respects model-specific cron schedules, allowing different models to operate at different cadences while coordinating their interactions through dependency resolution.
Usage
Use Context.run() in production environments as the main execution loop for incremental pipelines. Typically scheduled to run every few minutes via cron or orchestration tools, it processes new intervals as they become available according to each model's schedule. Use select_models to limit processing to specific subsets during troubleshooting or resource management.
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/core/context.py:L753-812
Signature
@python_api_analytics
def run(
self,
environment: t.Optional[str] = None,
*,
start: t.Optional[TimeLike] = None,
end: t.Optional[TimeLike] = None,
execution_time: t.Optional[TimeLike] = None,
skip_janitor: bool = False,
ignore_cron: bool = False,
select_models: t.Optional[t.Collection[str]] = None,
exit_on_env_update: t.Optional[int] = None,
no_auto_upstream: bool = False,
) -> CompletionStatus:
Import
from sqlmesh import Context
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| environment | str | No | Target environment to process (default: prod) |
| start | TimeLike | No | Start date for interval processing range |
| end | TimeLike | No | End date for interval processing range |
| execution_time | TimeLike | No | Reference time for execution (defaults to now) |
| skip_janitor | bool | No | Skip cleanup of expired resources (default: False) |
| ignore_cron | bool | No | Process all missing intervals regardless of schedule (default: False) |
| select_models | Collection[str] | No | Filter to specific models (default: all models) |
| exit_on_env_update | int | No | Exit code if interrupted by environment update |
| no_auto_upstream | bool | No | Don't force upstream model execution when using select_models (default: False) |
Outputs
| Name | Type | Description |
|---|---|---|
| status | CompletionStatus | Success/failure indicator for the run execution |
Usage Examples
Basic Production Run
from sqlmesh import Context
# Initialize context
context = Context()
# Run all models in production environment
# Processes any intervals that are scheduled and missing
status = context.run(environment='prod')
if status:
print("Run completed successfully")
else:
print("Run failed or partially completed")
Scheduled Cron Job
#!/usr/bin/env python
"""
Production cron job - run every 5 minutes
Crontab entry: */5 * * * * /path/to/sqlmesh_run.py
"""
from sqlmesh import Context
import sys
import logging
logging.basicConfig(level=logging.INFO)
def main():
context = Context()
try:
status = context.run(
environment='prod',
skip_janitor=False, # Clean up resources
)
return 0 if status else 1
except Exception as e:
logging.error(f"Run failed: {e}")
return 1
if __name__ == '__main__':
sys.exit(main())
Catch-Up After Downtime
from sqlmesh import Context
from datetime import datetime, timedelta
context = Context()
# System was down for 24 hours, process all missing intervals
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
today = datetime.now().strftime('%Y-%m-%d')
status = context.run(
environment='prod',
start=yesterday,
end=today,
ignore_cron=True # Process all missing intervals immediately
)
if status:
print("Catch-up processing completed")
Selective Model Processing
from sqlmesh import Context
context = Context()
# Process only high-priority models during peak hours
status = context.run(
environment='prod',
select_models=[
'my_schema.user_events',
'my_schema.transaction_summary'
]
)
# Note: upstream dependencies are automatically included
# unless no_auto_upstream=True
Development Environment Testing
from sqlmesh import Context
context = Context()
# Test incremental processing in dev environment
status = context.run(
environment='dev',
start='2024-01-01',
end='2024-01-07',
ignore_cron=True,
skip_janitor=True # Don't clean up dev resources during testing
)
print(f"Dev run status: {status}")
Production Run with Environment Update Handling
from sqlmesh import Context
import sys
context = Context()
# Exit with specific code if environment is being updated
status = context.run(
environment='prod',
exit_on_env_update=2 # Exit with code 2 if env update in progress
)
if isinstance(status, int) and status == 2:
print("Environment update in progress, will retry later")
sys.exit(2)
elif status:
print("Run completed successfully")
sys.exit(0)
else:
print("Run failed")
sys.exit(1)
Isolated Model Processing Without Dependencies
from sqlmesh import Context
context = Context()
# Process specific model without triggering upstream
# Useful when upstream is known to be current
status = context.run(
environment='prod',
select_models=['my_schema.final_report'],
no_auto_upstream=True # Don't process upstream dependencies
)
# Only processes final_report, assumes upstream is ready
Continuous Processing Loop
from sqlmesh import Context
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def continuous_processing(interval_seconds: int = 300):
"""
Run SQLMesh processing continuously.
Alternative to cron for containerized deployments.
"""
context = Context()
while True:
try:
logger.info("Starting processing run")
status = context.run(environment='prod')
if status:
logger.info("Run completed successfully")
else:
logger.warning("Run failed or partially completed")
except Exception as e:
logger.error(f"Run error: {e}", exc_info=True)
logger.info(f"Sleeping for {interval_seconds} seconds")
time.sleep(interval_seconds)
# Run every 5 minutes
continuous_processing(interval_seconds=300)
Override Execution Time for Testing
from sqlmesh import Context
from datetime import datetime
context = Context()
# Test processing with historical execution time
# Useful for validating temporal logic
status = context.run(
environment='test',
execution_time=datetime(2024, 1, 15, 12, 0, 0),
start='2024-01-01',
end='2024-01-15',
ignore_cron=True
)
# All temporal macros (e.g., @execution_time) use the override
Kubernetes CronJob Integration
#!/usr/bin/env python
"""
Kubernetes CronJob for SQLMesh incremental processing
Deploy as: kubectl create -f sqlmesh-cronjob.yaml
"""
from sqlmesh import Context
import sys
import os
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
# Configuration from environment variables
environment = os.getenv('SQLMESH_ENVIRONMENT', 'prod')
skip_janitor = os.getenv('SKIP_JANITOR', 'false').lower() == 'true'
logger.info(f"Starting SQLMesh run for environment: {environment}")
context = Context()
try:
status = context.run(
environment=environment,
skip_janitor=skip_janitor
)
if status:
logger.info("Run completed successfully")
return 0
else:
logger.error("Run failed")
return 1
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
return 1
if __name__ == '__main__':
sys.exit(main())
Monitoring and Alerting
from sqlmesh import Context
from datetime import datetime
import logging
import time
logger = logging.getLogger(__name__)
def monitored_run():
"""
Production run with monitoring and alerting.
"""
context = Context()
start_time = time.time()
try:
status = context.run(environment='prod')
duration = time.time() - start_time
if status:
logger.info(f"Run succeeded in {duration:.2f} seconds")
# Send success metric to monitoring system
# metrics.gauge('sqlmesh.run.duration', duration)
# metrics.increment('sqlmesh.run.success')
else:
logger.error(f"Run failed after {duration:.2f} seconds")
# Send failure alert
# alerts.send("SQLMesh run failed", severity='high')
# metrics.increment('sqlmesh.run.failure')
return status
except Exception as e:
duration = time.time() - start_time
logger.error(f"Run exception after {duration:.2f} seconds: {e}")
# Send critical alert
# alerts.send(f"SQLMesh run exception: {e}", severity='critical')
# metrics.increment('sqlmesh.run.exception')
return False
if __name__ == '__main__':
monitored_run()