Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TobikoData Sqlmesh CroniterCache IncrementalBy

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Incremental_Processing
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tools for configuring time-based interval generation and batch processing parameters provided by SQLMesh.

Description

CroniterCache provides efficient cron-based interval calculation with caching optimization for repeated next/previous interval lookups. It wraps the croniter library to generate time boundaries based on cron expressions while maintaining current position state for efficient sequential traversal.

The _IncrementalBy class defines the configuration parameters that control how incremental models process intervals, including batch sizing for performance optimization, lookback windows for historical context, concurrent execution limits, and forward-only change controls.

Together, these components enable fine-grained control over temporal partitioning and execution strategy for incremental data processing.

Usage

Use CroniterCache when implementing scheduling logic that needs to efficiently generate multiple consecutive intervals based on a cron expression. Use _IncrementalBy parameters in model definitions to control batch processing behavior, lookback requirements, and change management policies.

Code Reference

Source Location

  • Repository: sqlmesh
  • File: sqlmesh/utils/cron.py:L36-55 (CroniterCache), sqlmesh/core/model/kind.py:L419-444 (_IncrementalBy)

Signature

# CroniterCache
class CroniterCache:
    def __init__(
        self,
        cron: str,
        time: t.Optional[TimeLike] = None,
        tz: t.Optional[tzinfo] = None
    ):
        self.cron = cron
        self.tz = tz
        self.curr: datetime = to_datetime(
            now() if time is None else time, tz=self.tz
        )
        self.interval_seconds = interval_seconds(self.cron)

    def get_next(self, estimate: bool = False) -> datetime:
        ...

    def get_prev(self, estimate: bool = False) -> datetime:
        ...

# _IncrementalBy
class _IncrementalBy(_Incremental):
    dialect: t.Optional[str] = Field(None, validate_default=True)
    batch_size: t.Optional[SQLGlotPositiveInt] = None
    batch_concurrency: t.Optional[SQLGlotPositiveInt] = None
    lookback: t.Optional[SQLGlotPositiveInt] = None
    forward_only: SQLGlotBool = False
    disable_restatement: SQLGlotBool = False

Import

from sqlmesh.utils.cron import CroniterCache
from sqlmesh.core.model.kind import _IncrementalBy

I/O Contract

Inputs

CroniterCache:

Name Type Required Description
cron str Yes Cron expression defining interval schedule (e.g., '@daily', '0 * * * *')
time TimeLike No Starting time for interval generation (defaults to now)
tz tzinfo No Timezone for interpreting cron schedule

_IncrementalBy:

Name Type Required Description
dialect str No SQL dialect override for model-specific rendering
batch_size SQLGlotPositiveInt No Number of consecutive intervals to process as a single batch
batch_concurrency SQLGlotPositiveInt No Maximum number of concurrent batch executions
lookback SQLGlotPositiveInt No Number of prior intervals to include when reading data
forward_only SQLGlotBool No Whether changes apply only to future intervals (default: False)
disable_restatement SQLGlotBool No Whether to prevent automatic restatement (default: False)

Outputs

CroniterCache:

Name Type Description
instance CroniterCache Stateful iterator for generating cron-based intervals
get_next() datetime Returns the next interval boundary after current position
get_prev() datetime Returns the previous interval boundary before current position

_IncrementalBy:

Name Type Description
instance _IncrementalBy Base configuration for incremental processing behavior
data_hash_values List[Optional[str]] Values that affect data output (triggers reprocessing on change)
metadata_hash_values List[Optional[str]] Values that affect only metadata (no reprocessing needed)

Usage Examples

CroniterCache Basic Usage

from sqlmesh.utils.cron import CroniterCache
from datetime import datetime

# Create a daily cron iterator starting at a specific time
cron_cache = CroniterCache(
    cron='@daily',
    time=datetime(2024, 1, 1, 0, 0, 0)
)

# Generate next 5 daily intervals
intervals = []
for _ in range(5):
    next_time = cron_cache.get_next()
    intervals.append(next_time)
    print(f"Interval boundary: {next_time}")

# Output:
# Interval boundary: 2024-01-02 00:00:00
# Interval boundary: 2024-01-03 00:00:00
# Interval boundary: 2024-01-04 00:00:00
# Interval boundary: 2024-01-05 00:00:00
# Interval boundary: 2024-01-06 00:00:00

Hourly Intervals with Timezone

from sqlmesh.utils.cron import CroniterCache
from datetime import datetime
import pytz

# Create hourly iterator with UTC timezone
cron_cache = CroniterCache(
    cron='@hourly',
    time=datetime(2024, 1, 1, 12, 30, 0),
    tz=pytz.UTC
)

# Get next interval (rounds up to next hour boundary)
next_interval = cron_cache.get_next()
print(f"Next interval: {next_interval}")  # 2024-01-01 13:00:00+00:00

# Get previous interval
prev_interval = cron_cache.get_prev()
print(f"Previous interval: {prev_interval}")  # 2024-01-01 12:00:00+00:00

_IncrementalBy Configuration in Model

# In SQL model definition:
MODEL (
  name my_schema.sales_aggregates,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column sale_date,
    -- Process 7 days at a time for efficiency
    batch_size 7,
    -- Allow up to 3 weekly batches to run concurrently
    batch_concurrency 3,
    -- Include prior day's data for day-over-day comparisons
    lookback 1
  ),
  cron '@daily'
);

SELECT
  sale_date,
  product_id,
  SUM(quantity) as total_quantity,
  SUM(revenue) as total_revenue,
  -- Lookback enables access to previous day
  LAG(SUM(revenue)) OVER (
    PARTITION BY product_id
    ORDER BY sale_date
  ) as prev_day_revenue
FROM source.sales
WHERE sale_date BETWEEN @start_date AND @end_date
GROUP BY sale_date, product_id;

Forward-Only Configuration

# Model that doesn't reprocess history when logic changes
MODEL (
  name my_schema.transaction_log,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column transaction_timestamp,
    forward_only true,
    disable_restatement true,
    batch_size 24  -- Process 24 hours at a time
  ),
  cron '@hourly'
);

SELECT
  transaction_id,
  user_id,
  transaction_timestamp,
  amount,
  status
FROM source.transactions
WHERE transaction_timestamp BETWEEN @start_date AND @end_date;

Custom Cron Schedule

from sqlmesh.utils.cron import CroniterCache
from datetime import datetime

# Run at 2 AM every Monday
cron_cache = CroniterCache(
    cron='0 2 * * 1',  # minute hour day month weekday
    time=datetime(2024, 1, 1, 0, 0, 0)
)

# Generate weekly intervals
for _ in range(4):
    next_monday = cron_cache.get_next()
    print(f"Weekly processing at: {next_monday}")

# Model using this schedule:
# MODEL (
#   name my_schema.weekly_report,
#   kind INCREMENTAL_BY_TIME_RANGE (
#     time_column report_week
#   ),
#   cron '0 2 * * 1'
# );

Batch Size Optimization

# Small intervals, large batches for efficiency
MODEL (
  name my_schema.minute_level_metrics,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column event_minute,
    batch_size 60,  -- Process 1 hour of minutes at a time
    batch_concurrency 4
  ),
  cron '* * * * *'  -- Every minute
);

# vs. Large intervals, small batches for freshness
MODEL (
  name my_schema.daily_summary,
  kind INCREMENTAL_BY_TIME_RANGE (
    time_column report_date,
    batch_size 1,  -- Process each day individually
    batch_concurrency 10  -- But process multiple days in parallel
  ),
  cron '@daily'
);

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment