Implementation:Astronomer Astronomer cosmos FullOutputSubprocessHook
| Knowledge Sources | |
|---|---|
| Domains | Subprocess, Hooks |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
FullOutputSubprocessHook is an enhanced Airflow hook that executes shell commands in a subprocess while capturing the complete standard output, enabling downstream parsing of dbt command results.
Description
This 128-line module provides two classes that work together to support subprocess-based dbt execution within Cosmos.
FullOutputSubprocessResult is a NamedTuple with three fields: exit_code (the integer process return code), output (the last line or summary line of output, matching the interface of Airflow's built-in SubprocessResult), and full_output (the complete list of all output lines captured during execution). The full_output field is the key differentiator from Airflow's standard subprocess hook, as it preserves every line of dbt output for later parsing by functions like parse_number_of_warnings_subprocess and extract_log_issues.
FullOutputSubprocessHook extends Airflow's BaseHook and provides the run_command method, which spawns a subprocess, streams its output line by line (both to Airflow's task logger and into an internal buffer), waits for completion, and returns a FullOutputSubprocessResult. The optional process_log_line callback allows callers to transform each log line before it is stored, enabling filtering or enrichment during execution rather than in a separate post-processing step.
The hook also exposes send_sigterm and send_sigint methods for graceful process termination. These are critical for Airflow's task lifecycle: when a task is marked for cancellation or times out, Airflow calls these methods to signal the child process, allowing dbt to perform cleanup (e.g., dropping temporary relations) before exiting.
The env parameter accepts a dictionary of environment variables that are merged into the subprocess environment, and cwd sets the working directory for execution. The output_encoding parameter controls how raw bytes from the subprocess are decoded into strings.
Usage
Use FullOutputSubprocessHook when executing dbt commands via subprocess and you need access to the complete command output for parsing, logging, or assertion purposes. This is the default execution hook for Cosmos operators that use the subprocess execution mode, and it is the recommended choice when dbt-core is not installed in the Airflow worker's Python environment.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/hooks/subprocess.py
- Lines: Full module (128 lines)
Signature
class FullOutputSubprocessResult(NamedTuple):
exit_code: int
output: str
full_output: list[str]
class FullOutputSubprocessHook(BaseHook):
def run_command(
self,
command: list[str],
env: dict[str, str] | None = None,
output_encoding: str = "utf-8",
cwd: str | None = None,
process_log_line: Callable[[str], str] | None = None,
) -> FullOutputSubprocessResult:
...
def send_sigterm(self) -> None:
...
def send_sigint(self) -> None:
...
Import
from cosmos.hooks.subprocess import FullOutputSubprocessHook
from cosmos.hooks.subprocess import FullOutputSubprocessResult
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| command | list[str] | Yes | The shell command and arguments to execute as a subprocess |
| env | dict[str, str] or None | No | Optional environment variables merged into the subprocess environment |
| output_encoding | str | No | Character encoding for decoding subprocess output (defaults to "utf-8") |
| cwd | str or None | No | Working directory for the subprocess; defaults to the current directory if not specified |
| process_log_line | Callable[[str], str] or None | No | Optional callback that transforms each log line before it is buffered |
Outputs
| Name | Type | Description |
|---|---|---|
| result | FullOutputSubprocessResult | A named tuple containing exit_code (int), output (str, last/summary line), and full_output (list[str], all captured lines) |
Usage Examples
from cosmos.hooks.subprocess import FullOutputSubprocessHook
hook = FullOutputSubprocessHook()
result = hook.run_command(
command=["dbt", "run", "--models", "stg_orders"],
env={"DBT_PROFILES_DIR": "/opt/airflow/dags/dbt/profiles"},
cwd="/opt/airflow/dags/dbt/my_project",
)
print(f"Exit code: {result.exit_code}")
print(f"Summary: {result.output}")
print(f"Total lines captured: {len(result.full_output)}")
from cosmos.hooks.subprocess import FullOutputSubprocessHook
from cosmos.dbt.parser.output import parse_number_of_warnings_subprocess
hook = FullOutputSubprocessHook()
result = hook.run_command(
command=["dbt", "test", "--select", "tag:critical"],
cwd="/opt/airflow/dags/dbt/my_project",
)
warning_count = parse_number_of_warnings_subprocess(result)
if warning_count > 5:
raise ValueError(f"Too many warnings: {warning_count}")
from cosmos.hooks.subprocess import FullOutputSubprocessHook
def redact_secrets(line: str) -> str:
"""Remove sensitive values from log output."""
return line.replace("secret_token_value", "***REDACTED***")
hook = FullOutputSubprocessHook()
result = hook.run_command(
command=["dbt", "run"],
cwd="/opt/airflow/dags/dbt/my_project",
process_log_line=redact_secrets,
)
Related Pages
- Environment:Astronomer_Astronomer_cosmos_Python_Airflow_Runtime
- Astronomer_Astronomer_cosmos_Dbt_Output_Parser -- parsing functions that consume FullOutputSubprocessResult to extract warnings and errors
- Astronomer_Astronomer_cosmos_Dbt_Project_Utils -- filesystem utilities that prepare the working directory and environment used by run_command
- Astronomer_Astronomer_cosmos_MemoryTracker -- debug utility that can monitor the subprocess spawned by this hook