Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos DbtRunLocalOperator Execute

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Execution, Orchestration
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for executing dbt run and dbt test commands locally on the Airflow worker provided by the astronomer-cosmos library.

Description

DbtRunLocalOperator and DbtTestLocalOperator are the primary operators for local dbt execution in Cosmos. They inherit from DbtLocalBaseOperator (which provides the local execution engine via subprocess or dbtRunner) and their respective command mixins (DbtRunMixin and DbtTestMixin from cosmos.operators.base). These operators are rarely instantiated directly by users -- they are typically auto-generated by DbtDag or DbtTaskGroup during DAG rendering. However, they can be used standalone for fine-grained control.

Usage

Use these operators when you need to run individual dbt commands locally on the Airflow worker. In most Cosmos workflows, these operators are created automatically during DAG rendering based on the dbt project graph.

Code Reference

Source Location

  • Repository: astronomer-cosmos
  • File: cosmos/operators/local.py
  • Lines: L1078-1139 (DbtRunLocalOperator and DbtTestLocalOperator)

Signature

class DbtRunLocalOperator(DbtRunMixin, DbtLocalBaseOperator):
    """
    Executes a dbt core run command.
    """
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.base_cmd = ["run"]

class DbtTestLocalOperator(DbtTestMixin, DbtLocalBaseOperator):
    """
    Executes a dbt core test command.
    """
    def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
        self.on_warning_callback = on_warning_callback
        super().__init__(**kwargs)
        self.base_cmd = ["test"]

Key inherited kwargs from AbstractDbtBase:

Parameter Type Description
project_dir str / Path Path to the dbt project directory
conn_id str Airflow connection ID for the target database
select str dbt node selection syntax (e.g., +model_name+)
exclude str dbt node exclusion syntax
vars dict Variables to pass to dbt (--vars)
full_refresh bool Whether to run with --full-refresh flag
dbt_executable_path str Path to the dbt binary (SUBPROCESS mode)
install_deps bool Whether to run dbt deps before the main command
profile_config ProfileConfig Configuration for generating profiles.yml
invocation_mode InvocationMode SUBPROCESS or DBT_RUNNER
env dict Additional environment variables for the dbt process
no_version_check bool Whether to skip the dbt version check

Import

from cosmos.operators.local import DbtRunLocalOperator, DbtTestLocalOperator

I/O Contract

Inputs

Name Type Required Description
context Airflow Context Yes Provided automatically at runtime by the Airflow executor
project_dir str / Path Yes Path to the dbt project on the worker filesystem
profile_config ProfileConfig Yes Database connection profile configuration
select str No dbt node selector (typically set during DAG rendering)

Outputs

Name Type Description
return_value None / dbtRunnerResult DbtRunLocalOperator returns None (SUBPROCESS) or dbtRunnerResult (DBT_RUNNER)
compiled_sql XCom (template field) The compiled SQL for the executed model, stored in compiled_sql template field
test_result callback invocation DbtTestLocalOperator invokes on_warning_callback if warnings are detected in dbt test output

Usage Examples

Direct Operator Usage (Standalone)

from cosmos.operators.local import DbtRunLocalOperator, DbtTestLocalOperator
from cosmos.profiles import PostgresUserPasswordProfileMapping

profile_config = ProfileConfig(
    profile_name="jaffle_shop",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="my_postgres_conn",
        profile_args={"schema": "public"},
    ),
)

run_customers = DbtRunLocalOperator(
    task_id="run_customers_model",
    project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
    profile_config=profile_config,
    select="customers",
    install_deps=True,
)

test_customers = DbtTestLocalOperator(
    task_id="test_customers_model",
    project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
    profile_config=profile_config,
    select="customers",
    on_warning_callback=lambda result: print(f"Test warnings: {result}"),
)

run_customers >> test_customers

Auto-Generated via DbtTaskGroup (Typical Usage)

from cosmos import DbtTaskGroup, ProjectConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode, InvocationMode
from cosmos.profiles import PostgresUserPasswordProfileMapping

profile_config = ProfileConfig(
    profile_name="jaffle_shop",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="my_postgres_conn",
    ),
)

# DbtRunLocalOperator and DbtTestLocalOperator instances are
# automatically created for each model and test in the dbt project
dbt_tg = DbtTaskGroup(
    project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
    profile_config=profile_config,
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.LOCAL,
        invocation_mode=InvocationMode.DBT_RUNNER,
    ),
)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment