Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:TobikoData Sqlmesh Ongoing Interval Processing

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Incremental_Processing
Last Updated 2026-02-07 00:00 GMT

Overview

Continuously process new data intervals on a scheduled basis by running the DAG scheduler to evaluate and execute all missing intervals across incremental models.

Description

Ongoing interval processing implements the operational runtime for incremental data pipelines, continuously monitoring for new intervals that need processing and executing them according to dependency order and resource constraints. This is the production operation mode that runs after initial deployment, handling the steady-state flow of new data as it arrives.

The processing loop checks each model's cron schedule to determine when new intervals become available, evaluates upstream dependencies to ensure required data is ready, and executes transformations for intervals that are both scheduled and ready. The system maintains state across runs, ensuring that temporary failures are retried and that long-running pipelines can be safely interrupted and resumed.

The scheduler respects model-specific cron schedules, allowing different models in the same DAG to operate at different cadences. A daily aggregation model might process once per day at midnight, while a real-time alerting model processes every five minutes, with the scheduler coordinating their interactions.

This continuous processing mode integrates with environment finalization checks, ensuring that ongoing runs do not interfere with concurrent plan applications or environment updates. The janitor service runs periodically to clean up orphaned resources from development environments or failed deployments.

Usage

Use ongoing interval processing in production to maintain up-to-date data pipelines without manual intervention. Configure the execution to run on a schedule (typically every few minutes) that balances data freshness requirements against computational costs.

Specify the environment parameter to process intervals in non-production environments, enabling staging or QA pipelines that mirror production behavior. Use select_models to limit processing to specific subsets of the DAG when troubleshooting issues or managing resource contention.

Apply ignore_cron flag during catch-up scenarios where the processing schedule has been interrupted and you need to process all missing intervals immediately rather than waiting for their scheduled times.

Set execution_time to override the current time reference for testing or to process historical intervals with consistent temporal semantics.

Theoretical Basis

Ongoing interval processing implements a continuous evaluation loop with state management:

RUNTIME_LOOP:
  environment = target_environment OR "prod"
  execution_time = now()

  LOOP (typically invoked every few minutes):
    environment_state = load_environment_state(environment)

    IF NOT environment_state.finalized THEN
      WAIT for plan application to complete
      CONTINUE

    model_snapshots = environment_state.snapshot_versions
    interval_state = load_interval_completion_state(environment)

    missing_intervals = compute_missing_intervals(
      snapshots = model_snapshots,
      completion_state = interval_state,
      current_time = execution_time,
      respect_cron = NOT ignore_cron
    )

    IF missing_intervals is empty THEN
      RETURN success  // Nothing to do

    ready_intervals = filter_by_dependencies_met(
      missing_intervals,
      interval_state
    )

    IF select_models specified THEN
      ready_intervals = filter(ready_intervals,
                               model in select_models)

      IF NOT no_auto_upstream THEN
        ready_intervals = ready_intervals + upstream_dependencies

    execution_plan = schedule(ready_intervals):
      dag = build_dependency_graph()
      layers = topological_sort(dag)

      FOR each layer in layers:
        parallelizable_intervals = intervals_in_layer
        batches = group_by_concurrency_limit(parallelizable_intervals)

        FOR each batch in batches:
          execute_parallel(batch)
          update_completion_state(batch)

    IF NOT skip_janitor AND environment == "prod" THEN
      run_janitor():
        cleanup_expired_dev_environments()
        vacuum_orphaned_tables()
        archive_old_plan_metadata()

INTERVAL_READINESS:
  FOR each interval:
    upstream_intervals = compute_upstream_requirements(
      interval.model,
      interval.time_range
    )

    ready = ALL(upstream_intervals) in completion_state

    IF NOT ready THEN
      IF upstream_blocked THEN
        LOG warning("Upstream dependency blocked")
        skip_interval
      ELSE
        wait_for_upstream

EXECUTION:
  FOR each ready_interval:
    context = build_execution_context(
      interval = ready_interval,
      upstream_data = fetch_upstream_results(ready_interval),
      execution_time = execution_time
    )

    result = evaluate_model(
      model = interval.model,
      context = context
    )

    physical_location = resolve_interval_location(
      environment,
      interval.model,
      interval.time_range
    )

    materialize(result, physical_location)

    record_completion(
      environment,
      interval.model,
      interval.time_range,
      execution_time
    )

FAILURE_HANDLING:
  IF interval_execution fails THEN
    log_error(interval, error_details)

    IF transient_error THEN
      // Will retry on next run
      mark_as_pending(interval)
    ELSE
      mark_as_failed(interval)
      notify_on_failure(interval)

    // Continue processing other intervals
    CONTINUE to next interval

Key operational characteristics:

Cron-Aware Scheduling: Only processes intervals whose scheduled time has arrived, preventing premature processing of future intervals.

Stateful Resumption: Uses persistent completion state to resume processing after interruptions without reprocessing completed intervals.

Concurrent Safety: Checks environment finalization status to prevent conflicts with plan applications.

Selective Processing: Supports filtering to specific models while maintaining dependency correctness.

Resource Management: Respects batch concurrency limits to prevent overwhelming compute or database resources.

The ongoing processing model enables "set it and forget it" data pipeline operation, with the system automatically adapting to new data arrival, handling transient failures, and maintaining consistency across the dependency graph.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment