Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TobikoData Sqlmesh Context Invalidate Environment

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Environment_Management
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for invalidating and cleaning up virtual data environments provided by SQLMesh.

Description

The Context.invalidate_environment() method marks an environment for deletion by setting its expiration timestamp to the current time, and optionally performs immediate synchronous cleanup. When sync=False (default), the environment is marked as expired but physical deletion is deferred to a background janitor process. When sync=True, the method blocks until all database objects (views, schemas, or catalogs) and metadata are removed.

The cleanup process implemented by the janitor (via cleanup_expired_views() in sqlmesh/core/janitor.py) handles different environment configurations: dropping individual views for table-level environments, dropping entire schemas for schema-level environments, or dropping catalogs for catalog-level environments (on engines that support it). The cleanup is resilient with configurable error handling that can warn rather than fail on individual deletion failures.

Usage

Use Context.invalidate_environment() when you need to:

  • Delete development environments that are no longer needed
  • Clean up PR environments after branches are merged or closed
  • Remove temporary test environments created during CI/CD runs
  • Free up warehouse resources when approaching storage limits
  • Implement environment lifecycle policies with automatic expiration
  • Remove stale environments as part of regular housekeeping
  • Decommission environments when projects are completed
  • Test environment creation/deletion workflows during development

Code Reference

Source Location

  • Repository: sqlmesh
  • File: sqlmesh/core/context.py:L1806-1820
  • Cleanup Implementation: sqlmesh/core/janitor.py:L22-123

Signature

def invalidate_environment(self, name: str, sync: bool = False) -> None:
    """Invalidates the target environment by setting its expiration timestamp to now.

    Args:
        name: The name of the environment to invalidate.
        sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
            be deleted asynchronously by the janitor process.
    """

Related cleanup function:

def cleanup_expired_views(
    default_adapter: EngineAdapter,
    engine_adapters: t.Dict[str, EngineAdapter],
    environments: t.List[Environment],
    warn_on_delete_failure: bool = False,
    console: t.Optional[Console] = None,
) -> None:
    """Cleanup views, schemas, and catalogs for expired environments."""

Import

from sqlmesh.core.context import Context

context = Context(paths="path/to/project")

I/O Contract

Inputs

Name Type Required Description
name str Yes Name of the environment to invalidate
sync bool No Whether to perform synchronous cleanup. Default: False (async cleanup via janitor)

Outputs

Name Type Description
return None No return value. Raises exception on failure. Console log message indicates success

Usage Examples

Basic Usage

from sqlmesh.core.context import Context

context = Context()

# Asynchronous invalidation (default)
context.invalidate_environment("old_dev")

# Environment marked for deletion, janitor will clean up later
# Console output: "Environment 'old_dev' invalidated."

Synchronous Deletion

from sqlmesh.core.context import Context

context = Context()

# Immediate synchronous deletion
context.invalidate_environment("temp_test", sync=True)

# Blocks until all views/schemas/catalogs dropped and metadata removed
# Console output: "Environment 'temp_test' deleted."

Clean Up PR Environments

from sqlmesh.core.context import Context

context = Context()

# Clean up PR environment after merge
pr_env_name = "pr_123"

try:
    context.invalidate_environment(pr_env_name, sync=True)
    print(f"PR environment {pr_env_name} successfully deleted")
except Exception as e:
    print(f"Failed to delete PR environment: {e}")

Batch Cleanup

from sqlmesh.core.context import Context

context = Context()

# Clean up multiple old environments
old_environments = ["dev_alice_old", "feature_branch_merged", "test_20260101"]

for env_name in old_environments:
    try:
        context.invalidate_environment(env_name, sync=False)
        print(f"Invalidated {env_name}")
    except Exception as e:
        print(f"Failed to invalidate {env_name}: {e}")

# Janitor will clean up asynchronously

CI/CD Cleanup Hook

from sqlmesh.core.context import Context
import os

context = Context()

# Clean up CI environment after test run
ci_env_name = f"ci_{os.environ.get('CI_JOB_ID')}"

# Use sync=True in CI to ensure cleanup completes before job ends
context.invalidate_environment(ci_env_name, sync=True)

Conditional Cleanup

from sqlmesh.core.context import Context
from datetime import datetime, timedelta

context = Context()

# Get all environments
environments = context.state_reader.get_environments()
cutoff_time = datetime.now() - timedelta(days=7)

# Clean up dev environments older than 7 days
for env in environments:
    if env.name.startswith("dev_") and env.finalized_ts:
        finalized_dt = datetime.fromtimestamp(env.finalized_ts / 1000)

        if finalized_dt < cutoff_time:
            print(f"Cleaning up old dev environment: {env.name}")
            context.invalidate_environment(env.name, sync=False)

Safe Cleanup with Error Handling

from sqlmesh.core.context import Context

context = Context()

def safe_cleanup_environment(env_name: str, force: bool = False):
    """Safely clean up an environment with error handling."""

    # Check if environment exists
    env = context.state_reader.get_environment(env_name)

    if not env:
        print(f"Environment {env_name} does not exist")
        return

    # Prevent accidental deletion of production
    if env.name.lower() == "prod" and not force:
        raise ValueError("Cannot delete production environment without force=True")

    try:
        # Use async cleanup for non-critical environments
        is_critical = env.name.lower() in ["prod", "staging"]
        context.invalidate_environment(env_name, sync=is_critical)

        status = "deleted" if is_critical else "invalidated"
        print(f"Environment {env_name} {status} successfully")

    except Exception as e:
        print(f"Failed to invalidate environment {env_name}: {e}")
        raise

# Usage
safe_cleanup_environment("old_feature_branch")

Cleanup with Progress Tracking

from sqlmesh.core.context import Context
from sqlmesh.core.janitor import cleanup_expired_views

context = Context()

# Manual cleanup with progress tracking
env_to_cleanup = context.state_reader.get_environment("temp_env")

if env_to_cleanup:
    # Mark as expired
    context.invalidate_environment("temp_env", sync=False)

    # Get expired environments
    expired = [
        env for env in context.state_reader.get_environments()
        if env.expiration_ts and env.expiration_ts <= context.now()
    ]

    # Run cleanup manually with console output
    cleanup_expired_views(
        default_adapter=context.engine_adapter,
        engine_adapters=context._engine_adapters,
        environments=expired,
        warn_on_delete_failure=False,
        console=context.console
    )

    print("Cleanup completed")

Resilient Cleanup

from sqlmesh.core.context import Context
from sqlmesh.core.janitor import cleanup_expired_views

context = Context()

# Cleanup with warning on failure (continues even if some objects can't be dropped)
expired_envs = [
    env for env in context.state_reader.get_environments()
    if env.expiration_ts and env.expiration_ts <= context.now()
]

if expired_envs:
    cleanup_expired_views(
        default_adapter=context.engine_adapter,
        engine_adapters=context._engine_adapters,
        environments=expired_envs,
        warn_on_delete_failure=True,  # Warn but continue on errors
        console=context.console
    )
    print(f"Cleaned up {len(expired_envs)} environments (some failures may have occurred)")

Dry Run Check

from sqlmesh.core.context import Context

context = Context()

def preview_cleanup(env_name: str):
    """Show what would be deleted without actually deleting."""

    env = context.state_reader.get_environment(env_name)

    if not env:
        print(f"Environment {env_name} not found")
        return

    print(f"Would delete environment: {env_name}")
    print(f"  Suffix target: {env.suffix_target}")
    print(f"  Number of snapshots: {len(env.snapshots)}")
    print(f"  Models:")

    for snapshot in env.snapshots:
        if snapshot.is_model:
            view_name = snapshot.qualified_view_name.for_environment(
                env.naming_info,
                context.engine_adapter.dialect
            )
            print(f"    - {view_name}")

    # Uncomment to actually delete:
    # context.invalidate_environment(env_name, sync=True)

preview_cleanup("temp_env")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment