Implementation:TobikoData Sqlmesh Scheduler Merged Missing Intervals
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Incremental_Processing |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for computing missing and invalidated time intervals across all incremental models for backfill execution provided by SQLMesh.
Description
Scheduler.merged_missing_intervals() is the core method responsible for analyzing the current state of incremental models and determining which time intervals need processing. It compares the expected intervals (based on model cron schedules and date ranges) against the completion state stored in the state sync system to identify gaps that require backfilling.
The method handles multiple scenarios: initial deployments where no intervals have been processed, ongoing operations where recent intervals are missing, and restatements where previously completed intervals must be recalculated due to upstream changes or data corrections. It respects model dependencies to ensure that downstream intervals are only included when their upstream requirements can be satisfied.
The implementation supports sophisticated interval computation including lookback windows, cron schedule awareness, per-model start/end date overrides, and boundary conditions. The end_bounded parameter controls whether intervals can extend beyond the target end date due to lookback or partial interval handling, enabling predictable resource consumption during backfills.
Usage
Use merged_missing_intervals() when generating deployment plans to determine backfill requirements, computing catch-up work after pipeline interruptions, or analyzing the scope of restatement operations. The method is typically called internally by plan generation and execution workflows but can be used directly for custom orchestration scenarios.
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/core/scheduler.py:L138-189
Signature
def merged_missing_intervals(
self,
start: t.Optional[TimeLike] = None,
end: t.Optional[TimeLike] = None,
execution_time: t.Optional[TimeLike] = None,
deployability_index: t.Optional[DeployabilityIndex] = None,
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
ignore_cron: bool = False,
end_bounded: bool = False,
selected_snapshots: t.Optional[t.Set[str]] = None,
) -> SnapshotToIntervals:
Import
from sqlmesh.core.scheduler import Scheduler
from sqlmesh.core.snapshot import SnapshotToIntervals
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| start | TimeLike | No | Start of the interval range (defaults to min model start date) |
| end | TimeLike | No | End of the interval range (defaults to now) |
| execution_time | TimeLike | No | Reference time for execution (defaults to now) |
| deployability_index | DeployabilityIndex | No | Determines which snapshots are deployable |
| restatements | Dict[SnapshotId, Interval] | No | Models and intervals to restate |
| start_override_per_model | Dict[str, datetime] | No | Per-model start date overrides |
| end_override_per_model | Dict[str, datetime] | No | Per-model end date overrides |
| ignore_cron | bool | No | Ignore cron schedules and process all intervals (default: False) |
| end_bounded | bool | No | Bound intervals strictly by end date (default: False) |
| selected_snapshots | Set[str] | No | Filter to specific snapshot names (default: all) |
Outputs
| Name | Type | Description |
|---|---|---|
| snapshot_to_intervals | SnapshotToIntervals | Mapping of snapshots to their missing interval ranges |
Usage Examples
Basic Missing Intervals Computation
from sqlmesh import Context
context = Context()
# Get scheduler from context
scheduler = context._scheduler
# Compute missing intervals for last 30 days
missing = scheduler.merged_missing_intervals(
start='2024-01-01',
end='2024-01-31'
)
# Display results
for snapshot, intervals in missing.items():
print(f"Model: {snapshot.name}")
print(f" Missing intervals: {len(intervals)}")
for interval in intervals:
print(f" {interval.start} to {interval.end}")
Compute Backfill with Restatements
from sqlmesh import Context
from sqlmesh.core.snapshot import SnapshotId, Interval
from datetime import datetime
context = Context()
scheduler = context._scheduler
# Define restatements for specific models
restatements = {
SnapshotId(name='my_schema.raw_events', identifier='abc123'): Interval(
start=datetime(2024, 1, 1),
end=datetime(2024, 1, 15)
)
}
# Compute intervals including restatements
missing = scheduler.merged_missing_intervals(
start='2024-01-01',
end='2024-01-31',
restatements=restatements
)
# Shows both missing intervals and intervals requiring restatement
for snapshot, intervals in missing.items():
if snapshot.name == 'my_schema.raw_events':
print(f"Intervals to restate: {intervals}")
else:
print(f"Dependent model {snapshot.name} has {len(intervals)} affected intervals")
Ignore Cron for Catch-Up Processing
from sqlmesh import Context
from datetime import datetime, timedelta
context = Context()
scheduler = context._scheduler
# System was down for 48 hours, process all missing intervals
# regardless of cron schedule
end_time = datetime.now()
start_time = end_time - timedelta(hours=48)
missing = scheduler.merged_missing_intervals(
start=start_time,
end=end_time,
ignore_cron=True # Process all intervals, not just scheduled ones
)
total_intervals = sum(len(intervals) for intervals in missing.values())
print(f"Catch-up required for {total_intervals} intervals across all models")
Per-Model Date Overrides
from sqlmesh import Context
from datetime import datetime
context = Context()
scheduler = context._scheduler
# Different date ranges for different models
missing = scheduler.merged_missing_intervals(
start='2024-01-01', # Default start
end='2024-01-31', # Default end
start_override_per_model={
'my_schema.recent_model': datetime(2024, 1, 25), # Only last week
},
end_override_per_model={
'my_schema.high_priority': datetime(2024, 01, 31, 23, 59, 59), # Full range
}
)
# Each model uses its specific date range or falls back to defaults
End Bounded Processing
from sqlmesh import Context
context = Context()
scheduler = context._scheduler
# Strictly limit intervals to not exceed end date
# Useful for cost control and predictable resource usage
missing = scheduler.merged_missing_intervals(
start='2024-01-01',
end='2024-01-31',
end_bounded=True # Don't extend beyond Jan 31 even for lookback
)
# Without end_bounded, a model with lookback=7 processing Jan 31
# would read data from Jan 24-31
# With end_bounded, it processes exactly the requested range
Filter to Selected Models
from sqlmesh import Context
context = Context()
scheduler = context._scheduler
# Compute missing intervals for specific models only
missing = scheduler.merged_missing_intervals(
start='2024-01-01',
end='2024-01-31',
selected_snapshots={
'my_schema.user_events',
'my_schema.transaction_log'
}
)
# Only returns intervals for selected models
# Note: upstream dependencies must still be satisfied
Estimate Backfill Scope
from sqlmesh import Context
context = Context()
scheduler = context._scheduler
# Before running expensive backfill, estimate scope
missing = scheduler.merged_missing_intervals(
start='2024-01-01',
end='2024-12-31' # Full year
)
# Calculate estimates
total_intervals = 0
models_affected = 0
for snapshot, intervals in missing.items():
models_affected += 1
interval_count = len(intervals)
total_intervals += interval_count
print(f"{snapshot.name}: {interval_count} intervals")
print(f"\nTotal: {total_intervals} intervals across {models_affected} models")
print(f"Estimated time: {total_intervals * 2} minutes (assuming 2 min/interval)")
Production Deployment Analysis
from sqlmesh import Context
from datetime import datetime
def analyze_production_backfill(context: Context) -> dict:
"""
Analyze backfill requirements before production deployment.
"""
scheduler = context._scheduler
# Compute missing intervals
missing = scheduler.merged_missing_intervals(
start='2024-01-01',
end=datetime.now()
)
analysis = {
'total_models': len(missing),
'total_intervals': sum(len(intervals) for intervals in missing.values()),
'models_by_interval_count': {}
}
# Categorize models by workload
for snapshot, intervals in missing.items():
count = len(intervals)
if count > 100:
category = 'high'
elif count > 10:
category = 'medium'
else:
category = 'low'
analysis['models_by_interval_count'].setdefault(category, []).append({
'name': snapshot.name,
'interval_count': count
})
return analysis
context = Context()
analysis = analyze_production_backfill(context)
print(f"High workload models: {len(analysis['models_by_interval_count'].get('high', []))}")
print(f"Total intervals: {analysis['total_intervals']}")
Integration with Plan Generation
from sqlmesh import Context
context = Context()
# Context.plan internally calls merged_missing_intervals
# This example shows the relationship
# When you create a plan:
plan = context.plan(
environment='prod',
start='2024-01-01',
end='2024-01-31'
)
# Behind the scenes, it calls:
# scheduler.merged_missing_intervals(
# start='2024-01-01',
# end='2024-01-31',
# ...other params from plan...
# )
# Access the computed intervals through the plan
print(f"Plan will process {len(plan.missing_intervals)} interval ranges")
for snapshot, intervals in plan.missing_intervals.items():
print(f" {snapshot.name}: {len(intervals)} intervals")