Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos DbtRunner Wrapper

From Leeroopedia


Knowledge Sources
Domains dbt_Execution, Runner
Last Updated 2026-02-07 17:00 GMT

Overview

The DbtRunner wrapper module provides a thin abstraction over dbt-core's programmatic dbtRunner API, offering availability checks, runner instantiation, command execution, warning extraction, and exception handling in a single cohesive interface.

Description

This 128-line module encapsulates all direct interaction with the dbtRunner class from dbt.cli.main, isolating the rest of the Cosmos codebase from dbt-core internals.

is_available probes whether the dbtRunner class can be imported from the current Python environment. This allows Cosmos to gracefully degrade when dbt-core is not installed or when a subprocess-based execution mode is preferred.

get_runner instantiates a dbtRunner object, optionally attaching callback functions that receive events during execution. Callbacks can be used for real-time logging, progress tracking, or custom metric collection.

run_command is the primary entry point for executing dbt commands programmatically. It accepts a command list (e.g., ["run", "--models", "stg_orders"]), optional environment variables, a working directory, and callbacks. It returns a dbtRunnerResult containing the execution outcome, structured results, and any logs. This function manages the lifecycle of creating the runner, configuring the environment, and invoking the command.

parse_number_of_warnings inspects a dbtRunnerResult and returns the count of nodes that completed with a warning status. Unlike the subprocess-based parser that searches text output, this function queries the structured result objects directly, providing more reliable extraction.

handle_exception_if_needed examines the dbtRunnerResult for fatal errors and raises an appropriate Python exception if the dbt invocation failed. This translates dbt-internal error states into standard exceptions that Airflow's task lifecycle can handle.

Usage

Use this module when running dbt commands via the programmatic API rather than shelling out to a subprocess. This execution mode is typically faster since it avoids process spawning overhead and enables richer structured output. It is the default execution path for Cosmos operators when dbt-core is installed in the same Python environment as Airflow.

Code Reference

Source Location

Signature

def is_available() -> bool:
    ...

def get_runner(callbacks: list[Callable] | None = None) -> dbtRunner:
    ...

def run_command(
    command: list[str],
    env: dict[str, str] | None = None,
    cwd: str | None = None,
    callbacks: list[Callable] | None = None,
) -> dbtRunnerResult:
    ...

def parse_number_of_warnings(result: dbtRunnerResult) -> int:
    ...

def handle_exception_if_needed(result: dbtRunnerResult) -> None:
    ...

Import

from cosmos.dbt.runner import run_command
from cosmos.dbt.runner import get_runner
from cosmos.dbt.runner import is_available
from cosmos.dbt.runner import parse_number_of_warnings
from cosmos.dbt.runner import handle_exception_if_needed

I/O Contract

Inputs

Name Type Required Description
command list[str] Yes The dbt command and arguments to execute (e.g., ["run", "--models", "my_model"])
env dict[str, str] or None No Optional environment variables to set for the duration of the command
cwd str or None No Optional working directory in which to execute the dbt command
callbacks list[Callable] or None No Optional list of callback functions invoked with dbt events during execution
result dbtRunnerResult Yes (parse_number_of_warnings, handle_exception_if_needed) The result object from a dbtRunner invocation

Outputs

Name Type Description
available bool (is_available) Whether the dbtRunner class can be imported from the current environment
runner dbtRunner (get_runner) A configured dbtRunner instance ready to invoke commands
result dbtRunnerResult (run_command) The structured result of the dbt command execution
warning_count int (parse_number_of_warnings) Number of nodes that completed with warning status
(side effect) None or raises Exception (handle_exception_if_needed) Raises an exception if the result indicates a fatal error

Usage Examples

from cosmos.dbt.runner import is_available, run_command, parse_number_of_warnings

if is_available():
    result = run_command(
        command=["run", "--models", "stg_orders"],
        env={"DBT_PROFILES_DIR": "/opt/airflow/dags/dbt/profiles"},
        cwd="/opt/airflow/dags/dbt/my_project",
    )

    warnings = parse_number_of_warnings(result)
    print(f"Completed with {warnings} warning(s)")
from cosmos.dbt.runner import run_command, handle_exception_if_needed

result = run_command(command=["test", "--select", "tag:critical"])
handle_exception_if_needed(result)  # raises if dbt reported a fatal error
from cosmos.dbt.runner import get_runner

def my_callback(event):
    print(f"dbt event: {event}")

runner = get_runner(callbacks=[my_callback])

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment