Implementation:TobikoData Sqlmesh CroniterCache IncrementalBy
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Incremental_Processing |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tools for configuring time-based interval generation and batch processing parameters provided by SQLMesh.
Description
CroniterCache provides efficient cron-based interval calculation with caching optimization for repeated next/previous interval lookups. It wraps the croniter library to generate time boundaries based on cron expressions while maintaining current position state for efficient sequential traversal.
The _IncrementalBy class defines the configuration parameters that control how incremental models process intervals, including batch sizing for performance optimization, lookback windows for historical context, concurrent execution limits, and forward-only change controls.
Together, these components enable fine-grained control over temporal partitioning and execution strategy for incremental data processing.
Usage
Use CroniterCache when implementing scheduling logic that needs to efficiently generate multiple consecutive intervals based on a cron expression. Use _IncrementalBy parameters in model definitions to control batch processing behavior, lookback requirements, and change management policies.
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/utils/cron.py:L36-55 (CroniterCache), sqlmesh/core/model/kind.py:L419-444 (_IncrementalBy)
Signature
# CroniterCache
class CroniterCache:
def __init__(
self,
cron: str,
time: t.Optional[TimeLike] = None,
tz: t.Optional[tzinfo] = None
):
self.cron = cron
self.tz = tz
self.curr: datetime = to_datetime(
now() if time is None else time, tz=self.tz
)
self.interval_seconds = interval_seconds(self.cron)
def get_next(self, estimate: bool = False) -> datetime:
...
def get_prev(self, estimate: bool = False) -> datetime:
...
# _IncrementalBy
class _IncrementalBy(_Incremental):
dialect: t.Optional[str] = Field(None, validate_default=True)
batch_size: t.Optional[SQLGlotPositiveInt] = None
batch_concurrency: t.Optional[SQLGlotPositiveInt] = None
lookback: t.Optional[SQLGlotPositiveInt] = None
forward_only: SQLGlotBool = False
disable_restatement: SQLGlotBool = False
Import
from sqlmesh.utils.cron import CroniterCache
from sqlmesh.core.model.kind import _IncrementalBy
I/O Contract
Inputs
CroniterCache:
| Name | Type | Required | Description |
|---|---|---|---|
| cron | str | Yes | Cron expression defining interval schedule (e.g., '@daily', '0 * * * *') |
| time | TimeLike | No | Starting time for interval generation (defaults to now) |
| tz | tzinfo | No | Timezone for interpreting cron schedule |
_IncrementalBy:
| Name | Type | Required | Description |
|---|---|---|---|
| dialect | str | No | SQL dialect override for model-specific rendering |
| batch_size | SQLGlotPositiveInt | No | Number of consecutive intervals to process as a single batch |
| batch_concurrency | SQLGlotPositiveInt | No | Maximum number of concurrent batch executions |
| lookback | SQLGlotPositiveInt | No | Number of prior intervals to include when reading data |
| forward_only | SQLGlotBool | No | Whether changes apply only to future intervals (default: False) |
| disable_restatement | SQLGlotBool | No | Whether to prevent automatic restatement (default: False) |
Outputs
CroniterCache:
| Name | Type | Description |
|---|---|---|
| instance | CroniterCache | Stateful iterator for generating cron-based intervals |
| get_next() | datetime | Returns the next interval boundary after current position |
| get_prev() | datetime | Returns the previous interval boundary before current position |
_IncrementalBy:
| Name | Type | Description |
|---|---|---|
| instance | _IncrementalBy | Base configuration for incremental processing behavior |
| data_hash_values | List[Optional[str]] | Values that affect data output (triggers reprocessing on change) |
| metadata_hash_values | List[Optional[str]] | Values that affect only metadata (no reprocessing needed) |
Usage Examples
CroniterCache Basic Usage
from sqlmesh.utils.cron import CroniterCache
from datetime import datetime
# Create a daily cron iterator starting at a specific time
cron_cache = CroniterCache(
cron='@daily',
time=datetime(2024, 1, 1, 0, 0, 0)
)
# Generate next 5 daily intervals
intervals = []
for _ in range(5):
next_time = cron_cache.get_next()
intervals.append(next_time)
print(f"Interval boundary: {next_time}")
# Output:
# Interval boundary: 2024-01-02 00:00:00
# Interval boundary: 2024-01-03 00:00:00
# Interval boundary: 2024-01-04 00:00:00
# Interval boundary: 2024-01-05 00:00:00
# Interval boundary: 2024-01-06 00:00:00
Hourly Intervals with Timezone
from sqlmesh.utils.cron import CroniterCache
from datetime import datetime
import pytz
# Create hourly iterator with UTC timezone
cron_cache = CroniterCache(
cron='@hourly',
time=datetime(2024, 1, 1, 12, 30, 0),
tz=pytz.UTC
)
# Get next interval (rounds up to next hour boundary)
next_interval = cron_cache.get_next()
print(f"Next interval: {next_interval}") # 2024-01-01 13:00:00+00:00
# Get previous interval
prev_interval = cron_cache.get_prev()
print(f"Previous interval: {prev_interval}") # 2024-01-01 12:00:00+00:00
_IncrementalBy Configuration in Model
# In SQL model definition:
MODEL (
name my_schema.sales_aggregates,
kind INCREMENTAL_BY_TIME_RANGE (
time_column sale_date,
-- Process 7 days at a time for efficiency
batch_size 7,
-- Allow up to 3 weekly batches to run concurrently
batch_concurrency 3,
-- Include prior day's data for day-over-day comparisons
lookback 1
),
cron '@daily'
);
SELECT
sale_date,
product_id,
SUM(quantity) as total_quantity,
SUM(revenue) as total_revenue,
-- Lookback enables access to previous day
LAG(SUM(revenue)) OVER (
PARTITION BY product_id
ORDER BY sale_date
) as prev_day_revenue
FROM source.sales
WHERE sale_date BETWEEN @start_date AND @end_date
GROUP BY sale_date, product_id;
Forward-Only Configuration
# Model that doesn't reprocess history when logic changes
MODEL (
name my_schema.transaction_log,
kind INCREMENTAL_BY_TIME_RANGE (
time_column transaction_timestamp,
forward_only true,
disable_restatement true,
batch_size 24 -- Process 24 hours at a time
),
cron '@hourly'
);
SELECT
transaction_id,
user_id,
transaction_timestamp,
amount,
status
FROM source.transactions
WHERE transaction_timestamp BETWEEN @start_date AND @end_date;
Custom Cron Schedule
from sqlmesh.utils.cron import CroniterCache
from datetime import datetime
# Run at 2 AM every Monday
cron_cache = CroniterCache(
cron='0 2 * * 1', # minute hour day month weekday
time=datetime(2024, 1, 1, 0, 0, 0)
)
# Generate weekly intervals
for _ in range(4):
next_monday = cron_cache.get_next()
print(f"Weekly processing at: {next_monday}")
# Model using this schedule:
# MODEL (
# name my_schema.weekly_report,
# kind INCREMENTAL_BY_TIME_RANGE (
# time_column report_week
# ),
# cron '0 2 * * 1'
# );
Batch Size Optimization
# Small intervals, large batches for efficiency
MODEL (
name my_schema.minute_level_metrics,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_minute,
batch_size 60, -- Process 1 hour of minutes at a time
batch_concurrency 4
),
cron '* * * * *' -- Every minute
);
# vs. Large intervals, small batches for freshness
MODEL (
name my_schema.daily_summary,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date,
batch_size 1, -- Process each day individually
batch_concurrency 10 -- But process multiple days in parallel
),
cron '@daily'
);