Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos DbtVirtualenvBaseOperator

From Leeroopedia
Revision as of 14:29, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Astronomer_Astronomer_cosmos_DbtVirtualenvBaseOperator.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Airflow Operators, Virtual Environments, dbt Execution
Last Updated 2026-02-07 17:00 GMT

Overview

Operators for running dbt commands in isolated Python virtual environments, providing dependency isolation between the Airflow worker environment and the dbt execution environment.

Description

The cosmos.operators.virtualenv module defines the DbtVirtualenvBaseOperator base class and ten concrete operator subclasses that execute dbt CLI commands within dynamically created or persistent Python virtual environments.

DbtVirtualenvBaseOperator extends DbtLocalBaseOperator and manages the full lifecycle of a virtualenv: creation, dependency installation, lock-based sharing across concurrent tasks, and cleanup.

Key behaviour:

Virtualenv Creation: When virtualenv_dir is None or is_virtualenv_dir_temporary is True, a temporary directory is created using Python's TemporaryDirectory with the prefix "cosmos-venv". The virtualenv is prepared using Airflow's prepare_virtualenv() utility (imported from airflow.providers.standard.utils.python_virtualenv in Airflow 3, or airflow.utils.python_virtualenv in Airflow 2). After execution, temporary directories are cleaned up in the finally block and also via on_kill().

Persistent Virtualenv with Locking: When a virtualenv_dir is specified and is_virtualenv_dir_temporary is False, the virtualenv persists across task runs. A file-based lock mechanism (cosmos_virtualenv.lock) ensures that concurrent operators sharing the same virtualenv directory do not corrupt it. The lock file contains the PID of the owning process. The operator checks lock availability using psutil.Process(pid).is_running() and retries up to settings.virtualenv_max_retries_lock times (with 1-second sleep between retries). Stale locks from dead processes are automatically detected and overridden.

Command Execution: The run_subprocess() method overrides the parent to replace the dbt binary path with the one inside the virtualenv. The invocation mode is forced to InvocationMode.SUBPROCESS in the constructor.

Decorator: The depends_on_virtualenv_dir decorator is applied to lock-related methods (_is_lock_available, _acquire_venv_lock, _release_venv_lock) to raise a CosmosValueError if virtualenv_dir is None.

Concrete operator subclasses use multiple inheritance to combine DbtVirtualenvBaseOperator with specific local operator classes:

Operator dbt Command Parent Local Operator
DbtBuildVirtualenvOperator dbt build DbtBuildLocalOperator
DbtLSVirtualenvOperator dbt ls DbtLSLocalOperator
DbtSeedVirtualenvOperator dbt seed DbtSeedLocalOperator
DbtSnapshotVirtualenvOperator dbt snapshot DbtSnapshotLocalOperator
DbtSourceVirtualenvOperator dbt source freshness DbtSourceLocalOperator
DbtRunVirtualenvOperator dbt run DbtRunLocalOperator
DbtTestVirtualenvOperator dbt test DbtTestLocalOperator
DbtRunOperationVirtualenvOperator dbt run-operation DbtRunOperationLocalOperator
DbtDocsVirtualenvOperator dbt docs generate DbtDocsLocalOperator
DbtCloneVirtualenvOperator dbt clone DbtCloneLocalOperator

Usage

Use these operators when the Airflow worker's Python environment does not have the correct dbt adapter version installed, or when you need to run different dbt adapter versions for different tasks within the same DAG. The virtualenv operators install the required dbt packages at runtime, providing full isolation. For production environments with many tasks sharing the same dbt version, set virtualenv_dir to a persistent path to avoid reinstalling dependencies on every task execution.

Code Reference

Source Location

Signature

class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
    template_fields = DbtLocalBaseOperator.template_fields + (
        "virtualenv_dir",
        "is_virtualenv_dir_temporary",
    )

    def __init__(
        self,
        py_requirements: list[str] | None = None,
        pip_install_options: list[str] | None = None,
        py_system_site_packages: bool = False,
        virtualenv_dir: Path | None = None,
        is_virtualenv_dir_temporary: bool = False,
        **kwargs: Any,
    ) -> None: ...

    def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any) -> FullOutputSubprocessResult: ...
    def run_command(self, cmd: list[str], env: dict[str, str | bytes | os.PathLike[Any]], context: Context, ...) -> FullOutputSubprocessResult | dbtRunnerResult: ...
    def clean_dir_if_temporary(self) -> None: ...
    def execute(self, context: Context, **kwargs: Any) -> None: ...
    def on_kill(self) -> None: ...

Import

from cosmos.operators.virtualenv import DbtRunVirtualenvOperator
from cosmos.operators.virtualenv import DbtBuildVirtualenvOperator
from cosmos.operators.virtualenv import DbtTestVirtualenvOperator
from cosmos.operators.virtualenv import DbtVirtualenvBaseOperator

I/O Contract

Inputs

Name Type Required Description
py_requirements list[str] or None Yes (recommended) List of pip requirement strings to install in the virtualenv. Example: ["dbt-postgres==1.5.0"]. An error is logged if empty.
pip_install_options list[str] or None No Additional pip install options. Example: ["--upgrade", "--no-cache-dir"]. Defaults to empty list.
py_system_site_packages bool No Whether to include the Airflow worker's system site packages in the virtualenv. Defaults to False.
virtualenv_dir Path or None No Path to a persistent virtualenv directory. If None, a temporary directory is created and deleted after execution.
is_virtualenv_dir_temporary bool No When True, the virtualenv directory is deleted after execution even if virtualenv_dir is set. Defaults to False.
**kwargs Any No All additional keyword arguments are passed through to DbtLocalBaseOperator. The invocation_mode kwarg is forced to InvocationMode.SUBPROCESS.

Outputs

Name Type Description
FullOutputSubprocessResult FullOutputSubprocessResult Result of the dbt subprocess execution including stdout, stderr, and return code
dbtRunnerResult dbtRunnerResult Alternative result type when using dbt runner (not used in virtualenv mode since invocation is forced to SUBPROCESS)

Usage Examples

Basic Example

from cosmos.operators.virtualenv import DbtRunVirtualenvOperator

run_dbt = DbtRunVirtualenvOperator(
    task_id="dbt_run",
    project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
    py_requirements=["dbt-postgres==1.7.0"],
    pip_install_options=["--no-cache-dir"],
)

Persistent Virtualenv Example

from pathlib import Path
from cosmos.operators.virtualenv import DbtBuildVirtualenvOperator

build_dbt = DbtBuildVirtualenvOperator(
    task_id="dbt_build",
    project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
    py_requirements=["dbt-postgres==1.7.0", "dbt-core==1.7.0"],
    virtualenv_dir=Path("/tmp/cosmos_venvs/dbt_170"),
    py_system_site_packages=False,
)

Using with DbtDag

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.constants import ExecutionMode

jaffle_shop = DbtDag(
    project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
    profile_config=ProfileConfig(
        profile_name="jaffle_shop",
        target_name="dev",
    ),
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.VIRTUALENV,
    ),
    operator_args={
        "py_requirements": ["dbt-postgres==1.7.0"],
        "virtualenv_dir": "/tmp/cosmos_venvs/dbt_170",
    },
    dag_id="jaffle_shop_venv",
    schedule_interval="@daily",
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment