Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos Dbt Output Parser

From Leeroopedia


Knowledge Sources
Domains Parsing, dbt_Output
Last Updated 2026-02-07 17:00 GMT

Overview

The dbt output parser module provides a suite of functions that extract structured warnings, errors, and freshness information from raw dbt command output, enabling Cosmos operators to act on build results programmatically.

Description

This 147-line module contains four complementary parsing functions, each targeting a different shape of dbt output.

parse_number_of_warnings_subprocess accepts the result object from a subprocess-based dbt invocation and extracts the total warning count by scanning the output for dbt's standard warning summary line. It returns an integer count, making it straightforward to implement warning-threshold logic in operators.

extract_freshness_warn_msg parses dbt source-freshness output and returns a tuple containing a boolean indicating whether freshness warnings were found and the associated warning message text. This allows operators to surface staleness alerts without re-running the freshness check.

extract_log_issues takes a list of log lines and separates them into warnings and errors, returning a tuple of two lists. This is the general-purpose parser used when processing line-by-line log output from subprocess execution.

extract_dbt_runner_issues handles output from the programmatic dbtRunner API rather than subprocess text. It inspects the structured dbtRunnerResult object and filters results by the provided status_levels, returning a tuple of warning and error message lists.

⚠️ Deprecation Warning: Two functions in this module are deprecated since Cosmos 1.9 and will be removed in Cosmos 2.0:

  • parse_number_of_warnings_dbt_runner — Use cosmos.dbt.runner.parse_number_of_warnings instead.
  • extract_dbt_runner_issues — Use cosmos.dbt.runner.extract_message_by_status instead.

See Astronomer_Astronomer_cosmos_Deprecation_Migration_Paths for complete migration guidance.

Together these functions form the observation layer that Cosmos uses to translate opaque dbt output into actionable metadata for Airflow task state, XCom values, and alerting.

Usage

Use these parsers inside custom operators or callbacks that need to inspect dbt execution results. They are called internally by Cosmos operators after each dbt command completes, but are also available for direct use in custom post-execution hooks or test assertions.

Code Reference

Source Location

Signature

def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> int:
    ...

def extract_freshness_warn_msg(result: FullOutputSubprocessResult) -> tuple[bool, str]:
    ...

def extract_log_issues(log_list: list[str]) -> tuple[list[str], list[str]]:
    ...

def extract_dbt_runner_issues(
    result: dbtRunnerResult,
    status_levels: list[str],
) -> tuple[list[str], list[str]]:
    ...

Import

from cosmos.dbt.parser.output import parse_number_of_warnings_subprocess
from cosmos.dbt.parser.output import extract_freshness_warn_msg
from cosmos.dbt.parser.output import extract_log_issues
from cosmos.dbt.parser.output import extract_dbt_runner_issues

I/O Contract

Inputs

Name Type Required Description
result (subprocess) FullOutputSubprocessResult Yes The result object from a subprocess-based dbt command execution containing exit_code, output, and full_output
result (runner) dbtRunnerResult Yes The structured result object returned by the dbtRunner programmatic API
log_list list[str] Yes A list of individual log line strings from dbt command output
status_levels list[str] Yes A list of dbt result status strings (e.g., "warn", "error") to filter on when extracting runner issues

Outputs

Name Type Description
warning_count int (parse_number_of_warnings_subprocess) The total number of warnings reported by dbt
freshness_tuple tuple[bool, str] (extract_freshness_warn_msg) A boolean indicating freshness warnings exist and the associated message
log_issues tuple[list[str], list[str]] (extract_log_issues) Two lists: the first containing warning messages, the second containing error messages
runner_issues tuple[list[str], list[str]] (extract_dbt_runner_issues) Two lists: warnings and errors extracted from the dbtRunnerResult

Usage Examples

from cosmos.dbt.parser.output import parse_number_of_warnings_subprocess

# After running a dbt command via subprocess
warning_count = parse_number_of_warnings_subprocess(subprocess_result)
if warning_count > 0:
    print(f"dbt reported {warning_count} warning(s)")
from cosmos.dbt.parser.output import extract_log_issues

log_lines = [
    "WARNING: Deprecated function used in model stg_orders",
    "ERROR: Compilation error in model fct_revenue",
    "INFO: Model dim_customers completed successfully",
]

warnings, errors = extract_log_issues(log_lines)
print(f"Warnings: {warnings}")  # ["WARNING: Deprecated function used in model stg_orders"]
print(f"Errors: {errors}")      # ["ERROR: Compilation error in model fct_revenue"]
from cosmos.dbt.parser.output import extract_dbt_runner_issues

# After running via dbtRunner programmatic API
warnings, errors = extract_dbt_runner_issues(runner_result, status_levels=["warn", "error"])
for err in errors:
    print(f"dbt error: {err}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment