Implementation:Astronomer Astronomer cosmos Dbt Output Parser
| Knowledge Sources | |
|---|---|
| Domains | Parsing, dbt_Output |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The dbt output parser module provides a suite of functions that extract structured warnings, errors, and freshness information from raw dbt command output, enabling Cosmos operators to act on build results programmatically.
Description
This 147-line module contains four complementary parsing functions, each targeting a different shape of dbt output.
parse_number_of_warnings_subprocess accepts the result object from a subprocess-based dbt invocation and extracts the total warning count by scanning the output for dbt's standard warning summary line. It returns an integer count, making it straightforward to implement warning-threshold logic in operators.
extract_freshness_warn_msg parses dbt source-freshness output and returns a tuple containing a boolean indicating whether freshness warnings were found and the associated warning message text. This allows operators to surface staleness alerts without re-running the freshness check.
extract_log_issues takes a list of log lines and separates them into warnings and errors, returning a tuple of two lists. This is the general-purpose parser used when processing line-by-line log output from subprocess execution.
extract_dbt_runner_issues handles output from the programmatic dbtRunner API rather than subprocess text. It inspects the structured dbtRunnerResult object and filters results by the provided status_levels, returning a tuple of warning and error message lists.
⚠️ Deprecation Warning: Two functions in this module are deprecated since Cosmos 1.9 and will be removed in Cosmos 2.0:
parse_number_of_warnings_dbt_runner— Usecosmos.dbt.runner.parse_number_of_warningsinstead.extract_dbt_runner_issues— Usecosmos.dbt.runner.extract_message_by_statusinstead.
See Astronomer_Astronomer_cosmos_Deprecation_Migration_Paths for complete migration guidance.
Together these functions form the observation layer that Cosmos uses to translate opaque dbt output into actionable metadata for Airflow task state, XCom values, and alerting.
Usage
Use these parsers inside custom operators or callbacks that need to inspect dbt execution results. They are called internally by Cosmos operators after each dbt command completes, but are also available for direct use in custom post-execution hooks or test assertions.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/dbt/parser/output.py
- Lines: Full module (147 lines)
Signature
def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> int:
...
def extract_freshness_warn_msg(result: FullOutputSubprocessResult) -> tuple[bool, str]:
...
def extract_log_issues(log_list: list[str]) -> tuple[list[str], list[str]]:
...
def extract_dbt_runner_issues(
result: dbtRunnerResult,
status_levels: list[str],
) -> tuple[list[str], list[str]]:
...
Import
from cosmos.dbt.parser.output import parse_number_of_warnings_subprocess
from cosmos.dbt.parser.output import extract_freshness_warn_msg
from cosmos.dbt.parser.output import extract_log_issues
from cosmos.dbt.parser.output import extract_dbt_runner_issues
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| result (subprocess) | FullOutputSubprocessResult | Yes | The result object from a subprocess-based dbt command execution containing exit_code, output, and full_output |
| result (runner) | dbtRunnerResult | Yes | The structured result object returned by the dbtRunner programmatic API |
| log_list | list[str] | Yes | A list of individual log line strings from dbt command output |
| status_levels | list[str] | Yes | A list of dbt result status strings (e.g., "warn", "error") to filter on when extracting runner issues |
Outputs
| Name | Type | Description |
|---|---|---|
| warning_count | int | (parse_number_of_warnings_subprocess) The total number of warnings reported by dbt |
| freshness_tuple | tuple[bool, str] | (extract_freshness_warn_msg) A boolean indicating freshness warnings exist and the associated message |
| log_issues | tuple[list[str], list[str]] | (extract_log_issues) Two lists: the first containing warning messages, the second containing error messages |
| runner_issues | tuple[list[str], list[str]] | (extract_dbt_runner_issues) Two lists: warnings and errors extracted from the dbtRunnerResult |
Usage Examples
from cosmos.dbt.parser.output import parse_number_of_warnings_subprocess
# After running a dbt command via subprocess
warning_count = parse_number_of_warnings_subprocess(subprocess_result)
if warning_count > 0:
print(f"dbt reported {warning_count} warning(s)")
from cosmos.dbt.parser.output import extract_log_issues
log_lines = [
"WARNING: Deprecated function used in model stg_orders",
"ERROR: Compilation error in model fct_revenue",
"INFO: Model dim_customers completed successfully",
]
warnings, errors = extract_log_issues(log_lines)
print(f"Warnings: {warnings}") # ["WARNING: Deprecated function used in model stg_orders"]
print(f"Errors: {errors}") # ["ERROR: Compilation error in model fct_revenue"]
from cosmos.dbt.parser.output import extract_dbt_runner_issues
# After running via dbtRunner programmatic API
warnings, errors = extract_dbt_runner_issues(runner_result, status_levels=["warn", "error"])
for err in errors:
print(f"dbt error: {err}")
Related Pages
- Environment:Astronomer_Astronomer_cosmos_Python_Airflow_Runtime
- Astronomer_Astronomer_cosmos_DbtRunner_Wrapper -- produces the dbtRunnerResult objects parsed by extract_dbt_runner_issues
- Astronomer_Astronomer_cosmos_FullOutputSubprocessHook -- produces the FullOutputSubprocessResult objects parsed by parse_number_of_warnings_subprocess