Implementation:Astronomer Astronomer cosmos DbtRunLocalOperator Execute
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Execution, Orchestration |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for executing dbt run and dbt test commands locally on the Airflow worker provided by the astronomer-cosmos library.
Description
DbtRunLocalOperator and DbtTestLocalOperator are the primary operators for local dbt execution in Cosmos. They inherit from DbtLocalBaseOperator (which provides the local execution engine via subprocess or dbtRunner) and their respective command mixins (DbtRunMixin and DbtTestMixin from cosmos.operators.base). These operators are rarely instantiated directly by users -- they are typically auto-generated by DbtDag or DbtTaskGroup during DAG rendering. However, they can be used standalone for fine-grained control.
Usage
Use these operators when you need to run individual dbt commands locally on the Airflow worker. In most Cosmos workflows, these operators are created automatically during DAG rendering based on the dbt project graph.
Code Reference
Source Location
- Repository: astronomer-cosmos
- File: cosmos/operators/local.py
- Lines: L1078-1139 (DbtRunLocalOperator and DbtTestLocalOperator)
Signature
class DbtRunLocalOperator(DbtRunMixin, DbtLocalBaseOperator):
"""
Executes a dbt core run command.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.base_cmd = ["run"]
class DbtTestLocalOperator(DbtTestMixin, DbtLocalBaseOperator):
"""
Executes a dbt core test command.
"""
def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
self.on_warning_callback = on_warning_callback
super().__init__(**kwargs)
self.base_cmd = ["test"]
Key inherited kwargs from AbstractDbtBase:
| Parameter | Type | Description |
|---|---|---|
| project_dir | str / Path | Path to the dbt project directory |
| conn_id | str | Airflow connection ID for the target database |
| select | str | dbt node selection syntax (e.g., +model_name+)
|
| exclude | str | dbt node exclusion syntax |
| vars | dict | Variables to pass to dbt (--vars) |
| full_refresh | bool | Whether to run with --full-refresh flag |
| dbt_executable_path | str | Path to the dbt binary (SUBPROCESS mode) |
| install_deps | bool | Whether to run dbt deps before the main command
|
| profile_config | ProfileConfig | Configuration for generating profiles.yml |
| invocation_mode | InvocationMode | SUBPROCESS or DBT_RUNNER |
| env | dict | Additional environment variables for the dbt process |
| no_version_check | bool | Whether to skip the dbt version check |
Import
from cosmos.operators.local import DbtRunLocalOperator, DbtTestLocalOperator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| context | Airflow Context | Yes | Provided automatically at runtime by the Airflow executor |
| project_dir | str / Path | Yes | Path to the dbt project on the worker filesystem |
| profile_config | ProfileConfig | Yes | Database connection profile configuration |
| select | str | No | dbt node selector (typically set during DAG rendering) |
Outputs
| Name | Type | Description |
|---|---|---|
| return_value | None / dbtRunnerResult | DbtRunLocalOperator returns None (SUBPROCESS) or dbtRunnerResult (DBT_RUNNER) |
| compiled_sql | XCom (template field) | The compiled SQL for the executed model, stored in compiled_sql template field
|
| test_result | callback invocation | DbtTestLocalOperator invokes on_warning_callback if warnings are detected in dbt test output
|
Usage Examples
Direct Operator Usage (Standalone)
from cosmos.operators.local import DbtRunLocalOperator, DbtTestLocalOperator
from cosmos.profiles import PostgresUserPasswordProfileMapping
profile_config = ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="my_postgres_conn",
profile_args={"schema": "public"},
),
)
run_customers = DbtRunLocalOperator(
task_id="run_customers_model",
project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
profile_config=profile_config,
select="customers",
install_deps=True,
)
test_customers = DbtTestLocalOperator(
task_id="test_customers_model",
project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
profile_config=profile_config,
select="customers",
on_warning_callback=lambda result: print(f"Test warnings: {result}"),
)
run_customers >> test_customers
Auto-Generated via DbtTaskGroup (Typical Usage)
from cosmos import DbtTaskGroup, ProjectConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode, InvocationMode
from cosmos.profiles import PostgresUserPasswordProfileMapping
profile_config = ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="my_postgres_conn",
),
)
# DbtRunLocalOperator and DbtTestLocalOperator instances are
# automatically created for each model and test in the dbt project
dbt_tg = DbtTaskGroup(
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
invocation_mode=InvocationMode.DBT_RUNNER,
),
)
Related Pages
Implements Principle
Requires Environment
- Environment:Astronomer_Astronomer_cosmos_Python_Airflow_Runtime
- Environment:Astronomer_Astronomer_cosmos_Cosmos_Airflow_Configuration