Implementation:TobikoData Sqlmesh Context Invalidate Environment
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Environment_Management |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for invalidating and cleaning up virtual data environments provided by SQLMesh.
Description
The Context.invalidate_environment() method marks an environment for deletion by setting its expiration timestamp to the current time, and optionally performs immediate synchronous cleanup. When sync=False (default), the environment is marked as expired but physical deletion is deferred to a background janitor process. When sync=True, the method blocks until all database objects (views, schemas, or catalogs) and metadata are removed.
The cleanup process implemented by the janitor (via cleanup_expired_views() in sqlmesh/core/janitor.py) handles different environment configurations: dropping individual views for table-level environments, dropping entire schemas for schema-level environments, or dropping catalogs for catalog-level environments (on engines that support it). The cleanup is resilient with configurable error handling that can warn rather than fail on individual deletion failures.
Usage
Use Context.invalidate_environment() when you need to:
- Delete development environments that are no longer needed
- Clean up PR environments after branches are merged or closed
- Remove temporary test environments created during CI/CD runs
- Free up warehouse resources when approaching storage limits
- Implement environment lifecycle policies with automatic expiration
- Remove stale environments as part of regular housekeeping
- Decommission environments when projects are completed
- Test environment creation/deletion workflows during development
Code Reference
Source Location
- Repository: sqlmesh
- File: sqlmesh/core/context.py:L1806-1820
- Cleanup Implementation: sqlmesh/core/janitor.py:L22-123
Signature
def invalidate_environment(self, name: str, sync: bool = False) -> None:
"""Invalidates the target environment by setting its expiration timestamp to now.
Args:
name: The name of the environment to invalidate.
sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
be deleted asynchronously by the janitor process.
"""
Related cleanup function:
def cleanup_expired_views(
default_adapter: EngineAdapter,
engine_adapters: t.Dict[str, EngineAdapter],
environments: t.List[Environment],
warn_on_delete_failure: bool = False,
console: t.Optional[Console] = None,
) -> None:
"""Cleanup views, schemas, and catalogs for expired environments."""
Import
from sqlmesh.core.context import Context
context = Context(paths="path/to/project")
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | str | Yes | Name of the environment to invalidate |
| sync | bool | No | Whether to perform synchronous cleanup. Default: False (async cleanup via janitor) |
Outputs
| Name | Type | Description |
|---|---|---|
| return | None | No return value. Raises exception on failure. Console log message indicates success |
Usage Examples
Basic Usage
from sqlmesh.core.context import Context
context = Context()
# Asynchronous invalidation (default)
context.invalidate_environment("old_dev")
# Environment marked for deletion, janitor will clean up later
# Console output: "Environment 'old_dev' invalidated."
Synchronous Deletion
from sqlmesh.core.context import Context
context = Context()
# Immediate synchronous deletion
context.invalidate_environment("temp_test", sync=True)
# Blocks until all views/schemas/catalogs dropped and metadata removed
# Console output: "Environment 'temp_test' deleted."
Clean Up PR Environments
from sqlmesh.core.context import Context
context = Context()
# Clean up PR environment after merge
pr_env_name = "pr_123"
try:
context.invalidate_environment(pr_env_name, sync=True)
print(f"PR environment {pr_env_name} successfully deleted")
except Exception as e:
print(f"Failed to delete PR environment: {e}")
Batch Cleanup
from sqlmesh.core.context import Context
context = Context()
# Clean up multiple old environments
old_environments = ["dev_alice_old", "feature_branch_merged", "test_20260101"]
for env_name in old_environments:
try:
context.invalidate_environment(env_name, sync=False)
print(f"Invalidated {env_name}")
except Exception as e:
print(f"Failed to invalidate {env_name}: {e}")
# Janitor will clean up asynchronously
CI/CD Cleanup Hook
from sqlmesh.core.context import Context
import os
context = Context()
# Clean up CI environment after test run
ci_env_name = f"ci_{os.environ.get('CI_JOB_ID')}"
# Use sync=True in CI to ensure cleanup completes before job ends
context.invalidate_environment(ci_env_name, sync=True)
Conditional Cleanup
from sqlmesh.core.context import Context
from datetime import datetime, timedelta
context = Context()
# Get all environments
environments = context.state_reader.get_environments()
cutoff_time = datetime.now() - timedelta(days=7)
# Clean up dev environments older than 7 days
for env in environments:
if env.name.startswith("dev_") and env.finalized_ts:
finalized_dt = datetime.fromtimestamp(env.finalized_ts / 1000)
if finalized_dt < cutoff_time:
print(f"Cleaning up old dev environment: {env.name}")
context.invalidate_environment(env.name, sync=False)
Safe Cleanup with Error Handling
from sqlmesh.core.context import Context
context = Context()
def safe_cleanup_environment(env_name: str, force: bool = False):
"""Safely clean up an environment with error handling."""
# Check if environment exists
env = context.state_reader.get_environment(env_name)
if not env:
print(f"Environment {env_name} does not exist")
return
# Prevent accidental deletion of production
if env.name.lower() == "prod" and not force:
raise ValueError("Cannot delete production environment without force=True")
try:
# Use async cleanup for non-critical environments
is_critical = env.name.lower() in ["prod", "staging"]
context.invalidate_environment(env_name, sync=is_critical)
status = "deleted" if is_critical else "invalidated"
print(f"Environment {env_name} {status} successfully")
except Exception as e:
print(f"Failed to invalidate environment {env_name}: {e}")
raise
# Usage
safe_cleanup_environment("old_feature_branch")
Cleanup with Progress Tracking
from sqlmesh.core.context import Context
from sqlmesh.core.janitor import cleanup_expired_views
context = Context()
# Manual cleanup with progress tracking
env_to_cleanup = context.state_reader.get_environment("temp_env")
if env_to_cleanup:
# Mark as expired
context.invalidate_environment("temp_env", sync=False)
# Get expired environments
expired = [
env for env in context.state_reader.get_environments()
if env.expiration_ts and env.expiration_ts <= context.now()
]
# Run cleanup manually with console output
cleanup_expired_views(
default_adapter=context.engine_adapter,
engine_adapters=context._engine_adapters,
environments=expired,
warn_on_delete_failure=False,
console=context.console
)
print("Cleanup completed")
Resilient Cleanup
from sqlmesh.core.context import Context
from sqlmesh.core.janitor import cleanup_expired_views
context = Context()
# Cleanup with warning on failure (continues even if some objects can't be dropped)
expired_envs = [
env for env in context.state_reader.get_environments()
if env.expiration_ts and env.expiration_ts <= context.now()
]
if expired_envs:
cleanup_expired_views(
default_adapter=context.engine_adapter,
engine_adapters=context._engine_adapters,
environments=expired_envs,
warn_on_delete_failure=True, # Warn but continue on errors
console=context.console
)
print(f"Cleaned up {len(expired_envs)} environments (some failures may have occurred)")
Dry Run Check
from sqlmesh.core.context import Context
context = Context()
def preview_cleanup(env_name: str):
"""Show what would be deleted without actually deleting."""
env = context.state_reader.get_environment(env_name)
if not env:
print(f"Environment {env_name} not found")
return
print(f"Would delete environment: {env_name}")
print(f" Suffix target: {env.suffix_target}")
print(f" Number of snapshots: {len(env.snapshots)}")
print(f" Models:")
for snapshot in env.snapshots:
if snapshot.is_model:
view_name = snapshot.qualified_view_name.for_environment(
env.naming_info,
context.engine_adapter.dialect
)
print(f" - {view_name}")
# Uncomment to actually delete:
# context.invalidate_environment(env_name, sync=True)
preview_cleanup("temp_env")