Implementation:TobikoData Sqlmesh StateSync Promote Finalize
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Environment_Management |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for promoting and finalizing virtual data environments provided by SQLMesh.
Description
The StateSync.promote() and StateSync.finalize() methods work together to transition environments from draft to active state. The promote() method updates an environment's snapshot references to point to the desired model versions, validates data completeness by checking for gaps in processed intervals (if requested), and returns information about which snapshots were added or removed. The finalize() method marks the environment as ready for queries by setting its finalized timestamp, signaling that all setup operations completed successfully.
These methods implement a two-phase commit pattern: promotion updates the logical environment configuration, while finalization provides an atomic "ready" signal. This separation allows the system to update views and perform other setup operations between promotion and finalization, ensuring environments are never queryable in an inconsistent state. The StateSync abstraction is implemented by various backends (database, in-memory, etc.) providing consistent behavior across deployment scenarios.
Usage
Use StateSync.promote() and finalize() when:
- Applying a plan to update an environment with new model versions
- Activating a newly created environment after backfill completes
- Implementing deployment workflows that require explicit activation steps
- Recovering from failed promotions by rolling back to previous state
- Building custom environment management workflows with fine-grained control
- Coordinating multi-step environment updates with external systems
- Implementing approval gates where promotion and finalization are separated
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/core/state_sync/base.py:L407-435
Signature
@abc.abstractmethod
def promote(
self,
environment: Environment,
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
environment_statements: t.Optional[t.List[EnvironmentStatements]] = None,
) -> PromotionResult:
"""Update the environment to reflect the current state.
This method verifies that snapshots have been pushed.
Args:
environment: The environment to promote.
no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
all snapshots will be checked. The data gap check ensures that models that are already a
part of the target environment have no data gaps when compared against previous
snapshots for same models.
Returns:
A tuple of (added snapshot table infos, removed snapshot table infos)
"""
@abc.abstractmethod
def finalize(self, environment: Environment) -> None:
"""Finalize the target environment, indicating that this environment has been
fully promoted and is ready for use.
Args:
environment: The target environment to finalize.
"""
Import
from sqlmesh.core.context import Context
# StateSync is accessed through Context
context = Context()
state_sync = context.state_sync
I/O Contract
Inputs
For promote():
| Name | Type | Required | Description |
|---|---|---|---|
| environment | Environment | Yes | Environment object with updated snapshot references |
| no_gaps_snapshot_names | Set[str] | No | Set of model names to validate for data gaps. If None, all snapshots are checked |
| environment_statements | List[EnvironmentStatements] | No | Optional SQL statements to execute during promotion |
For finalize():
| Name | Type | Required | Description |
|---|---|---|---|
| environment | Environment | Yes | Environment to mark as finalized and ready for use |
Outputs
For promote():
| Name | Type | Description |
|---|---|---|
| return | PromotionResult | Tuple containing (added_snapshot_table_infos, removed_snapshot_table_infos) indicating which models were added/removed from environment |
For finalize():
| Name | Type | Description |
|---|---|---|
| return | None | No return value. Raises exception on failure |
Usage Examples
Basic Usage
from sqlmesh.core.context import Context
context = Context()
# Get or create environment with updated snapshots
environment = context.state_reader.get_environment("staging")
# ... environment.snapshots updated with new versions ...
# Promote the environment
result = context.state_sync.promote(environment)
print(f"Added {len(result.added)} snapshots")
print(f"Removed {len(result.removed)} snapshots")
# Finalize after views are created
context.state_sync.finalize(environment)
print("Environment finalized and ready for queries")
Promote with Gap Validation
from sqlmesh.core.context import Context
context = Context()
environment = context.state_reader.get_environment("prod")
# Promote with data gap validation for critical models
try:
result = context.state_sync.promote(
environment,
no_gaps_snapshot_names={"orders", "customers", "transactions"}
)
except Exception as e:
print(f"Promotion failed due to data gaps: {e}")
# Handle gap detection - possibly trigger backfill
# Only finalize if promotion succeeded
context.state_sync.finalize(environment)
Selective Gap Checking
from sqlmesh.core.context import Context
context = Context()
environment = context.state_reader.get_environment("dev")
# Skip gap validation for dev environment (faster promotion)
result = context.state_sync.promote(
environment,
no_gaps_snapshot_names=set() # Empty set = skip all gap checks
)
# Dev environments can be finalized immediately
context.state_sync.finalize(environment)
Inspect Promotion Changes
from sqlmesh.core.context import Context
context = Context()
environment = context.state_reader.get_environment("staging")
# Promote and inspect what changed
result = context.state_sync.promote(environment)
print("Models added to environment:")
for snapshot_info in result.added:
print(f" + {snapshot_info.name} v{snapshot_info.version}")
print("Models removed from environment:")
for snapshot_info in result.removed:
print(f" - {snapshot_info.name} v{snapshot_info.version}")
# Finalize after review
context.state_sync.finalize(environment)
Production Deployment Pattern
from sqlmesh.core.context import Context
context = Context()
environment = context.state_reader.get_environment("prod")
# Production promotion with strict validation
try:
# Validate all models have complete data
all_model_names = {s.name for s in environment.snapshots}
result = context.state_sync.promote(
environment,
no_gaps_snapshot_names=all_model_names
)
# Update views (done by Context.apply typically)
# ... view creation logic ...
# Only finalize after all operations succeed
context.state_sync.finalize(environment)
print("Production deployment successful")
except Exception as e:
print(f"Deployment failed: {e}")
# Rollback logic would go here
raise
Environment Creation Flow
from sqlmesh.core.context import Context
from sqlmesh.core.environment import Environment
context = Context()
# Create new environment
new_env = Environment(
name="feature_123",
snapshots=[...], # Snapshot references
start_at="2026-01-01",
end_at="2026-12-31",
plan_id="plan_abc123"
)
# Promote (creates environment record)
result = context.state_sync.promote(
new_env,
no_gaps_snapshot_names=set() # New env, no gap check needed
)
# Create virtual layer views
# ... view creation ...
# Mark as ready
context.state_sync.finalize(new_env)
print(f"Environment {new_env.name} created and ready")
Rollback on Failure
from sqlmesh.core.context import Context
context = Context()
# Get current environment state
environment = context.state_reader.get_environment("staging")
previous_snapshots = environment.previous_finalized_snapshots
try:
# Attempt promotion
result = context.state_sync.promote(environment)
# If view creation fails, rollback
try:
# ... create views ...
context.state_sync.finalize(environment)
except Exception as view_error:
# Restore previous snapshots
environment.snapshots = previous_snapshots
context.state_sync.promote(environment, no_gaps_snapshot_names=set())
context.state_sync.finalize(environment)
raise
except Exception as e:
print(f"Promotion failed, environment unchanged: {e}")
Two-Phase Deployment
from sqlmesh.core.context import Context
import time
context = Context()
environment = context.state_reader.get_environment("prod")
# Phase 1: Promote (updates references but not finalized)
result = context.state_sync.promote(
environment,
no_gaps_snapshot_names={s.name for s in environment.snapshots}
)
print("Phase 1: Environment promoted, views being updated...")
# Perform potentially slow operations
# ... create/update views ...
# ... warm caches ...
# ... run smoke tests ...
time.sleep(5) # Simulate slow operations
# Phase 2: Finalize (atomic cutover)
context.state_sync.finalize(environment)
print("Phase 2: Environment finalized, now active")