Implementation:Astronomer Astronomer cosmos DbtRunner Wrapper
| Knowledge Sources | |
|---|---|
| Domains | dbt_Execution, Runner |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The DbtRunner wrapper module provides a thin abstraction over dbt-core's programmatic dbtRunner API, offering availability checks, runner instantiation, command execution, warning extraction, and exception handling in a single cohesive interface.
Description
This 128-line module encapsulates all direct interaction with the dbtRunner class from dbt.cli.main, isolating the rest of the Cosmos codebase from dbt-core internals.
is_available probes whether the dbtRunner class can be imported from the current Python environment. This allows Cosmos to gracefully degrade when dbt-core is not installed or when a subprocess-based execution mode is preferred.
get_runner instantiates a dbtRunner object, optionally attaching callback functions that receive events during execution. Callbacks can be used for real-time logging, progress tracking, or custom metric collection.
run_command is the primary entry point for executing dbt commands programmatically. It accepts a command list (e.g., ["run", "--models", "stg_orders"]), optional environment variables, a working directory, and callbacks. It returns a dbtRunnerResult containing the execution outcome, structured results, and any logs. This function manages the lifecycle of creating the runner, configuring the environment, and invoking the command.
parse_number_of_warnings inspects a dbtRunnerResult and returns the count of nodes that completed with a warning status. Unlike the subprocess-based parser that searches text output, this function queries the structured result objects directly, providing more reliable extraction.
handle_exception_if_needed examines the dbtRunnerResult for fatal errors and raises an appropriate Python exception if the dbt invocation failed. This translates dbt-internal error states into standard exceptions that Airflow's task lifecycle can handle.
Usage
Use this module when running dbt commands via the programmatic API rather than shelling out to a subprocess. This execution mode is typically faster since it avoids process spawning overhead and enables richer structured output. It is the default execution path for Cosmos operators when dbt-core is installed in the same Python environment as Airflow.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/dbt/runner.py
- Lines: Full module (128 lines)
Signature
def is_available() -> bool:
...
def get_runner(callbacks: list[Callable] | None = None) -> dbtRunner:
...
def run_command(
command: list[str],
env: dict[str, str] | None = None,
cwd: str | None = None,
callbacks: list[Callable] | None = None,
) -> dbtRunnerResult:
...
def parse_number_of_warnings(result: dbtRunnerResult) -> int:
...
def handle_exception_if_needed(result: dbtRunnerResult) -> None:
...
Import
from cosmos.dbt.runner import run_command
from cosmos.dbt.runner import get_runner
from cosmos.dbt.runner import is_available
from cosmos.dbt.runner import parse_number_of_warnings
from cosmos.dbt.runner import handle_exception_if_needed
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| command | list[str] | Yes | The dbt command and arguments to execute (e.g., ["run", "--models", "my_model"]) |
| env | dict[str, str] or None | No | Optional environment variables to set for the duration of the command |
| cwd | str or None | No | Optional working directory in which to execute the dbt command |
| callbacks | list[Callable] or None | No | Optional list of callback functions invoked with dbt events during execution |
| result | dbtRunnerResult | Yes | (parse_number_of_warnings, handle_exception_if_needed) The result object from a dbtRunner invocation |
Outputs
| Name | Type | Description |
|---|---|---|
| available | bool | (is_available) Whether the dbtRunner class can be imported from the current environment |
| runner | dbtRunner | (get_runner) A configured dbtRunner instance ready to invoke commands |
| result | dbtRunnerResult | (run_command) The structured result of the dbt command execution |
| warning_count | int | (parse_number_of_warnings) Number of nodes that completed with warning status |
| (side effect) | None or raises Exception | (handle_exception_if_needed) Raises an exception if the result indicates a fatal error |
Usage Examples
from cosmos.dbt.runner import is_available, run_command, parse_number_of_warnings
if is_available():
result = run_command(
command=["run", "--models", "stg_orders"],
env={"DBT_PROFILES_DIR": "/opt/airflow/dags/dbt/profiles"},
cwd="/opt/airflow/dags/dbt/my_project",
)
warnings = parse_number_of_warnings(result)
print(f"Completed with {warnings} warning(s)")
from cosmos.dbt.runner import run_command, handle_exception_if_needed
result = run_command(command=["test", "--select", "tag:critical"])
handle_exception_if_needed(result) # raises if dbt reported a fatal error
from cosmos.dbt.runner import get_runner
def my_callback(event):
print(f"dbt event: {event}")
runner = get_runner(callbacks=[my_callback])
Related Pages
- Environment:Astronomer_Astronomer_cosmos_Python_Airflow_Runtime
- Astronomer_Astronomer_cosmos_Dbt_Output_Parser -- extract_dbt_runner_issues parses the dbtRunnerResult produced by this module
- Astronomer_Astronomer_cosmos_Dbt_Project_Utils -- prepares the project directory and environment consumed by run_command