Implementation:Astronomer Astronomer cosmos DbtVirtualenvBaseOperator
| Knowledge Sources | |
|---|---|
| Domains | Airflow Operators, Virtual Environments, dbt Execution |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
Operators for running dbt commands in isolated Python virtual environments, providing dependency isolation between the Airflow worker environment and the dbt execution environment.
Description
The cosmos.operators.virtualenv module defines the DbtVirtualenvBaseOperator base class and ten concrete operator subclasses that execute dbt CLI commands within dynamically created or persistent Python virtual environments.
DbtVirtualenvBaseOperator extends DbtLocalBaseOperator and manages the full lifecycle of a virtualenv: creation, dependency installation, lock-based sharing across concurrent tasks, and cleanup.
Key behaviour:
Virtualenv Creation: When virtualenv_dir is None or is_virtualenv_dir_temporary is True, a temporary directory is created using Python's TemporaryDirectory with the prefix "cosmos-venv". The virtualenv is prepared using Airflow's prepare_virtualenv() utility (imported from airflow.providers.standard.utils.python_virtualenv in Airflow 3, or airflow.utils.python_virtualenv in Airflow 2). After execution, temporary directories are cleaned up in the finally block and also via on_kill().
Persistent Virtualenv with Locking: When a virtualenv_dir is specified and is_virtualenv_dir_temporary is False, the virtualenv persists across task runs. A file-based lock mechanism (cosmos_virtualenv.lock) ensures that concurrent operators sharing the same virtualenv directory do not corrupt it. The lock file contains the PID of the owning process. The operator checks lock availability using psutil.Process(pid).is_running() and retries up to settings.virtualenv_max_retries_lock times (with 1-second sleep between retries). Stale locks from dead processes are automatically detected and overridden.
Command Execution: The run_subprocess() method overrides the parent to replace the dbt binary path with the one inside the virtualenv. The invocation mode is forced to InvocationMode.SUBPROCESS in the constructor.
Decorator: The depends_on_virtualenv_dir decorator is applied to lock-related methods (_is_lock_available, _acquire_venv_lock, _release_venv_lock) to raise a CosmosValueError if virtualenv_dir is None.
Concrete operator subclasses use multiple inheritance to combine DbtVirtualenvBaseOperator with specific local operator classes:
| Operator | dbt Command | Parent Local Operator |
|---|---|---|
| DbtBuildVirtualenvOperator | dbt build |
DbtBuildLocalOperator |
| DbtLSVirtualenvOperator | dbt ls |
DbtLSLocalOperator |
| DbtSeedVirtualenvOperator | dbt seed |
DbtSeedLocalOperator |
| DbtSnapshotVirtualenvOperator | dbt snapshot |
DbtSnapshotLocalOperator |
| DbtSourceVirtualenvOperator | dbt source freshness |
DbtSourceLocalOperator |
| DbtRunVirtualenvOperator | dbt run |
DbtRunLocalOperator |
| DbtTestVirtualenvOperator | dbt test |
DbtTestLocalOperator |
| DbtRunOperationVirtualenvOperator | dbt run-operation |
DbtRunOperationLocalOperator |
| DbtDocsVirtualenvOperator | dbt docs generate |
DbtDocsLocalOperator |
| DbtCloneVirtualenvOperator | dbt clone |
DbtCloneLocalOperator |
Usage
Use these operators when the Airflow worker's Python environment does not have the correct dbt adapter version installed, or when you need to run different dbt adapter versions for different tasks within the same DAG. The virtualenv operators install the required dbt packages at runtime, providing full isolation. For production environments with many tasks sharing the same dbt version, set virtualenv_dir to a persistent path to avoid reinstalling dependencies on every task execution.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/operators/virtualenv.py
- Lines: 1-339
Signature
class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
template_fields = DbtLocalBaseOperator.template_fields + (
"virtualenv_dir",
"is_virtualenv_dir_temporary",
)
def __init__(
self,
py_requirements: list[str] | None = None,
pip_install_options: list[str] | None = None,
py_system_site_packages: bool = False,
virtualenv_dir: Path | None = None,
is_virtualenv_dir_temporary: bool = False,
**kwargs: Any,
) -> None: ...
def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any) -> FullOutputSubprocessResult: ...
def run_command(self, cmd: list[str], env: dict[str, str | bytes | os.PathLike[Any]], context: Context, ...) -> FullOutputSubprocessResult | dbtRunnerResult: ...
def clean_dir_if_temporary(self) -> None: ...
def execute(self, context: Context, **kwargs: Any) -> None: ...
def on_kill(self) -> None: ...
Import
from cosmos.operators.virtualenv import DbtRunVirtualenvOperator
from cosmos.operators.virtualenv import DbtBuildVirtualenvOperator
from cosmos.operators.virtualenv import DbtTestVirtualenvOperator
from cosmos.operators.virtualenv import DbtVirtualenvBaseOperator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| py_requirements | list[str] or None | Yes (recommended) | List of pip requirement strings to install in the virtualenv. Example: ["dbt-postgres==1.5.0"]. An error is logged if empty.
|
| pip_install_options | list[str] or None | No | Additional pip install options. Example: ["--upgrade", "--no-cache-dir"]. Defaults to empty list.
|
| py_system_site_packages | bool | No | Whether to include the Airflow worker's system site packages in the virtualenv. Defaults to False.
|
| virtualenv_dir | Path or None | No | Path to a persistent virtualenv directory. If None, a temporary directory is created and deleted after execution. |
| is_virtualenv_dir_temporary | bool | No | When True, the virtualenv directory is deleted after execution even if virtualenv_dir is set. Defaults to False.
|
| **kwargs | Any | No | All additional keyword arguments are passed through to DbtLocalBaseOperator. The invocation_mode kwarg is forced to InvocationMode.SUBPROCESS.
|
Outputs
| Name | Type | Description |
|---|---|---|
| FullOutputSubprocessResult | FullOutputSubprocessResult | Result of the dbt subprocess execution including stdout, stderr, and return code |
| dbtRunnerResult | dbtRunnerResult | Alternative result type when using dbt runner (not used in virtualenv mode since invocation is forced to SUBPROCESS) |
Usage Examples
Basic Example
from cosmos.operators.virtualenv import DbtRunVirtualenvOperator
run_dbt = DbtRunVirtualenvOperator(
task_id="dbt_run",
project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
py_requirements=["dbt-postgres==1.7.0"],
pip_install_options=["--no-cache-dir"],
)
Persistent Virtualenv Example
from pathlib import Path
from cosmos.operators.virtualenv import DbtBuildVirtualenvOperator
build_dbt = DbtBuildVirtualenvOperator(
task_id="dbt_build",
project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
py_requirements=["dbt-postgres==1.7.0", "dbt-core==1.7.0"],
virtualenv_dir=Path("/tmp/cosmos_venvs/dbt_170"),
py_system_site_packages=False,
)
Using with DbtDag
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.constants import ExecutionMode
jaffle_shop = DbtDag(
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
),
operator_args={
"py_requirements": ["dbt-postgres==1.7.0"],
"virtualenv_dir": "/tmp/cosmos_venvs/dbt_170",
},
dag_id="jaffle_shop_venv",
schedule_interval="@daily",
)
Related Pages
- cosmos.operators.local.DbtLocalBaseOperator -- Parent base class providing core dbt execution logic
- cosmos.operators.local.DbtBuildLocalOperator -- Local build operator that DbtBuildVirtualenvOperator extends
- cosmos.operators.local.DbtRunLocalOperator -- Local run operator that DbtRunVirtualenvOperator extends