Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TobikoData Sqlmesh IncrementalByTimeRangeKind Init

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Incremental_Processing
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete implementation for defining incremental models that process data by time-based intervals provided by SQLMesh.

Description

IncrementalByTimeRangeKind is a model kind class that configures incremental processing based on time ranges. It extends the base _IncrementalBy class to specify which column represents the time dimension, how intervals should be partitioned, and whether automatic restatement should occur for recent intervals.

This implementation handles the core configuration of time-based incremental models, including validation of the time column specification, management of partition strategies, and calculation of fingerprint hashes that determine when model versions change. The class integrates with SQLMesh's snapshot system to track which intervals have been processed and coordinate incremental execution.

Usage

Use IncrementalByTimeRangeKind when defining SQL models that should process data incrementally based on a timestamp or date column. Specify this in model definitions where data grows continuously over time and you want to avoid full table scans on each execution.

Code Reference

Source Location

  • Repository: sqlmesh
  • File: sqlmesh/core/model/kind.py:L465-507 (IncrementalByTimeRangeKind), L419-444 (_IncrementalBy)

Signature

class IncrementalByTimeRangeKind(_IncrementalBy):
    name: t.Literal[ModelKindName.INCREMENTAL_BY_TIME_RANGE] = (
        ModelKindName.INCREMENTAL_BY_TIME_RANGE
    )
    time_column: TimeColumn
    auto_restatement_intervals: t.Optional[SQLGlotPositiveInt] = None
    partition_by_time_column: SQLGlotBool = True

    # Inherited from _IncrementalBy:
    # dialect: t.Optional[str] = None
    # batch_size: t.Optional[SQLGlotPositiveInt] = None
    # batch_concurrency: t.Optional[SQLGlotPositiveInt] = None
    # lookback: t.Optional[SQLGlotPositiveInt] = None
    # forward_only: SQLGlotBool = False
    # disable_restatement: SQLGlotBool = False

Import

from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, TimeColumn
from sqlmesh.core.model import ModelKindName

I/O Contract

Inputs

Name Type Required Description
name ModelKindName.INCREMENTAL_BY_TIME_RANGE Yes The model kind identifier (automatically set)
time_column TimeColumn Yes The column specification for temporal partitioning (column name and optional format)
auto_restatement_intervals SQLGlotPositiveInt No Number of recent intervals to automatically reprocess on each run
partition_by_time_column SQLGlotBool No Whether to physically partition the table by the time column (default: True)
batch_size SQLGlotPositiveInt No Number of intervals to process in a single batch
batch_concurrency SQLGlotPositiveInt No Maximum number of concurrent batch executions
lookback SQLGlotPositiveInt No Number of prior intervals to include in the read window
forward_only SQLGlotBool No Whether to apply changes only to future intervals (default: False)
disable_restatement SQLGlotBool No Whether to disable restatement for this model (default: False)
dialect str No SQL dialect override for model-specific rendering

Outputs

Name Type Description
instance IncrementalByTimeRangeKind Configured model kind instance with validated parameters

Usage Examples

Basic Usage

# In a SQLMesh SQL model file:
# models/daily_aggregates.sql

MODEL (
  name my_schema.daily_aggregates,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column event_timestamp
  ),
  cron '@daily',
  grains (user_id, ds)
);

SELECT
  user_id,
  DATE(event_timestamp) as ds,
  COUNT(*) as event_count,
  SUM(revenue) as total_revenue
FROM source.events
WHERE event_timestamp BETWEEN @start_date AND @end_date
GROUP BY user_id, ds;

Advanced Configuration

# Model with lookback window and batching
MODEL (
  name my_schema.rolling_metrics,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column event_date,
    lookback 7,
    batch_size 7,
    auto_restatement_intervals 3,
    partition_by_time_column true
  ),
  cron '@daily'
);

SELECT
  event_date,
  user_id,
  -- 7-day rolling average (lookback enables this)
  AVG(daily_value) OVER (
    PARTITION BY user_id
    ORDER BY event_date
    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
  ) as rolling_avg_7d
FROM source.daily_metrics
WHERE event_date BETWEEN @start_date AND @end_date;

Forward-Only Model

# Model that doesn't reprocess history
MODEL (
  name my_schema.immutable_facts,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column created_at,
    forward_only true,
    disable_restatement true
  ),
  cron '@hourly'
);

SELECT
  transaction_id,
  user_id,
  created_at,
  amount
FROM source.transactions
WHERE created_at BETWEEN @start_date AND @end_date;

Programmatic Usage

from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, TimeColumn

# Create a model kind configuration programmatically
kind = IncrementalByTimeRangeKind(
    time_column=TimeColumn(column="event_timestamp", format=None),
    batch_size=7,
    lookback=1,
    auto_restatement_intervals=2,
    partition_by_time_column=True,
    forward_only=False,
    disable_restatement=False
)

# Access properties
print(f"Model kind: {kind.name}")
print(f"Data hash values: {kind.data_hash_values}")
print(f"Metadata hash values: {kind.metadata_hash_values}")

# Convert to SQLGlot expression for serialization
expression = kind.to_expression(dialect="spark")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment