Principle:TobikoData Sqlmesh Ongoing Interval Processing
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Incremental_Processing |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Continuously process new data intervals on a scheduled basis by running the DAG scheduler to evaluate and execute all missing intervals across incremental models.
Description
Ongoing interval processing implements the operational runtime for incremental data pipelines, continuously monitoring for new intervals that need processing and executing them according to dependency order and resource constraints. This is the production operation mode that runs after initial deployment, handling the steady-state flow of new data as it arrives.
The processing loop checks each model's cron schedule to determine when new intervals become available, evaluates upstream dependencies to ensure required data is ready, and executes transformations for intervals that are both scheduled and ready. The system maintains state across runs, ensuring that temporary failures are retried and that long-running pipelines can be safely interrupted and resumed.
The scheduler respects model-specific cron schedules, allowing different models in the same DAG to operate at different cadences. A daily aggregation model might process once per day at midnight, while a real-time alerting model processes every five minutes, with the scheduler coordinating their interactions.
This continuous processing mode integrates with environment finalization checks, ensuring that ongoing runs do not interfere with concurrent plan applications or environment updates. The janitor service runs periodically to clean up orphaned resources from development environments or failed deployments.
Usage
Use ongoing interval processing in production to maintain up-to-date data pipelines without manual intervention. Configure the execution to run on a schedule (typically every few minutes) that balances data freshness requirements against computational costs.
Specify the environment parameter to process intervals in non-production environments, enabling staging or QA pipelines that mirror production behavior. Use select_models to limit processing to specific subsets of the DAG when troubleshooting issues or managing resource contention.
Apply ignore_cron flag during catch-up scenarios where the processing schedule has been interrupted and you need to process all missing intervals immediately rather than waiting for their scheduled times.
Set execution_time to override the current time reference for testing or to process historical intervals with consistent temporal semantics.
Theoretical Basis
Ongoing interval processing implements a continuous evaluation loop with state management:
RUNTIME_LOOP:
environment = target_environment OR "prod"
execution_time = now()
LOOP (typically invoked every few minutes):
environment_state = load_environment_state(environment)
IF NOT environment_state.finalized THEN
WAIT for plan application to complete
CONTINUE
model_snapshots = environment_state.snapshot_versions
interval_state = load_interval_completion_state(environment)
missing_intervals = compute_missing_intervals(
snapshots = model_snapshots,
completion_state = interval_state,
current_time = execution_time,
respect_cron = NOT ignore_cron
)
IF missing_intervals is empty THEN
RETURN success // Nothing to do
ready_intervals = filter_by_dependencies_met(
missing_intervals,
interval_state
)
IF select_models specified THEN
ready_intervals = filter(ready_intervals,
model in select_models)
IF NOT no_auto_upstream THEN
ready_intervals = ready_intervals + upstream_dependencies
execution_plan = schedule(ready_intervals):
dag = build_dependency_graph()
layers = topological_sort(dag)
FOR each layer in layers:
parallelizable_intervals = intervals_in_layer
batches = group_by_concurrency_limit(parallelizable_intervals)
FOR each batch in batches:
execute_parallel(batch)
update_completion_state(batch)
IF NOT skip_janitor AND environment == "prod" THEN
run_janitor():
cleanup_expired_dev_environments()
vacuum_orphaned_tables()
archive_old_plan_metadata()
INTERVAL_READINESS:
FOR each interval:
upstream_intervals = compute_upstream_requirements(
interval.model,
interval.time_range
)
ready = ALL(upstream_intervals) in completion_state
IF NOT ready THEN
IF upstream_blocked THEN
LOG warning("Upstream dependency blocked")
skip_interval
ELSE
wait_for_upstream
EXECUTION:
FOR each ready_interval:
context = build_execution_context(
interval = ready_interval,
upstream_data = fetch_upstream_results(ready_interval),
execution_time = execution_time
)
result = evaluate_model(
model = interval.model,
context = context
)
physical_location = resolve_interval_location(
environment,
interval.model,
interval.time_range
)
materialize(result, physical_location)
record_completion(
environment,
interval.model,
interval.time_range,
execution_time
)
FAILURE_HANDLING:
IF interval_execution fails THEN
log_error(interval, error_details)
IF transient_error THEN
// Will retry on next run
mark_as_pending(interval)
ELSE
mark_as_failed(interval)
notify_on_failure(interval)
// Continue processing other intervals
CONTINUE to next interval
Key operational characteristics:
Cron-Aware Scheduling: Only processes intervals whose scheduled time has arrived, preventing premature processing of future intervals.
Stateful Resumption: Uses persistent completion state to resume processing after interruptions without reprocessing completed intervals.
Concurrent Safety: Checks environment finalization status to prevent conflicts with plan applications.
Selective Processing: Supports filtering to specific models while maintaining dependency correctness.
Resource Management: Respects batch concurrency limits to prevent overwhelming compute or database resources.
The ongoing processing model enables "set it and forget it" data pipeline operation, with the system automatically adapting to new data arrival, handling transient failures, and maintaining consistency across the dependency graph.