Implementation:TobikoData Sqlmesh IncrementalByTimeRangeKind Init
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Incremental_Processing |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete implementation for defining incremental models that process data by time-based intervals provided by SQLMesh.
Description
IncrementalByTimeRangeKind is a model kind class that configures incremental processing based on time ranges. It extends the base _IncrementalBy class to specify which column represents the time dimension, how intervals should be partitioned, and whether automatic restatement should occur for recent intervals.
This implementation handles the core configuration of time-based incremental models, including validation of the time column specification, management of partition strategies, and calculation of fingerprint hashes that determine when model versions change. The class integrates with SQLMesh's snapshot system to track which intervals have been processed and coordinate incremental execution.
Usage
Use IncrementalByTimeRangeKind when defining SQL models that should process data incrementally based on a timestamp or date column. Specify this in model definitions where data grows continuously over time and you want to avoid full table scans on each execution.
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/core/model/kind.py:L465-507 (IncrementalByTimeRangeKind), L419-444 (_IncrementalBy)
Signature
class IncrementalByTimeRangeKind(_IncrementalBy):
name: t.Literal[ModelKindName.INCREMENTAL_BY_TIME_RANGE] = (
ModelKindName.INCREMENTAL_BY_TIME_RANGE
)
time_column: TimeColumn
auto_restatement_intervals: t.Optional[SQLGlotPositiveInt] = None
partition_by_time_column: SQLGlotBool = True
# Inherited from _IncrementalBy:
# dialect: t.Optional[str] = None
# batch_size: t.Optional[SQLGlotPositiveInt] = None
# batch_concurrency: t.Optional[SQLGlotPositiveInt] = None
# lookback: t.Optional[SQLGlotPositiveInt] = None
# forward_only: SQLGlotBool = False
# disable_restatement: SQLGlotBool = False
Import
from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, TimeColumn
from sqlmesh.core.model import ModelKindName
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | ModelKindName.INCREMENTAL_BY_TIME_RANGE | Yes | The model kind identifier (automatically set) |
| time_column | TimeColumn | Yes | The column specification for temporal partitioning (column name and optional format) |
| auto_restatement_intervals | SQLGlotPositiveInt | No | Number of recent intervals to automatically reprocess on each run |
| partition_by_time_column | SQLGlotBool | No | Whether to physically partition the table by the time column (default: True) |
| batch_size | SQLGlotPositiveInt | No | Number of intervals to process in a single batch |
| batch_concurrency | SQLGlotPositiveInt | No | Maximum number of concurrent batch executions |
| lookback | SQLGlotPositiveInt | No | Number of prior intervals to include in the read window |
| forward_only | SQLGlotBool | No | Whether to apply changes only to future intervals (default: False) |
| disable_restatement | SQLGlotBool | No | Whether to disable restatement for this model (default: False) |
| dialect | str | No | SQL dialect override for model-specific rendering |
Outputs
| Name | Type | Description |
|---|---|---|
| instance | IncrementalByTimeRangeKind | Configured model kind instance with validated parameters |
Usage Examples
Basic Usage
# In a SQLMesh SQL model file:
# models/daily_aggregates.sql
MODEL (
name my_schema.daily_aggregates,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_timestamp
),
cron '@daily',
grains (user_id, ds)
);
SELECT
user_id,
DATE(event_timestamp) as ds,
COUNT(*) as event_count,
SUM(revenue) as total_revenue
FROM source.events
WHERE event_timestamp BETWEEN @start_date AND @end_date
GROUP BY user_id, ds;
Advanced Configuration
# Model with lookback window and batching
MODEL (
name my_schema.rolling_metrics,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_date,
lookback 7,
batch_size 7,
auto_restatement_intervals 3,
partition_by_time_column true
),
cron '@daily'
);
SELECT
event_date,
user_id,
-- 7-day rolling average (lookback enables this)
AVG(daily_value) OVER (
PARTITION BY user_id
ORDER BY event_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as rolling_avg_7d
FROM source.daily_metrics
WHERE event_date BETWEEN @start_date AND @end_date;
Forward-Only Model
# Model that doesn't reprocess history
MODEL (
name my_schema.immutable_facts,
kind INCREMENTAL_BY_TIME_RANGE (
time_column created_at,
forward_only true,
disable_restatement true
),
cron '@hourly'
);
SELECT
transaction_id,
user_id,
created_at,
amount
FROM source.transactions
WHERE created_at BETWEEN @start_date AND @end_date;
Programmatic Usage
from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, TimeColumn
# Create a model kind configuration programmatically
kind = IncrementalByTimeRangeKind(
time_column=TimeColumn(column="event_timestamp", format=None),
batch_size=7,
lookback=1,
auto_restatement_intervals=2,
partition_by_time_column=True,
forward_only=False,
disable_restatement=False
)
# Access properties
print(f"Model kind: {kind.name}")
print(f"Data hash values: {kind.data_hash_values}")
print(f"Metadata hash values: {kind.metadata_hash_values}")
# Convert to SQLGlot expression for serialization
expression = kind.to_expression(dialect="spark")