Implementation:Astronomer Astronomer cosmos DbtGraph Load And Build
Overview
Concrete tool for loading dbt project graphs and building Airflow task structures provided by the astronomer-cosmos library. This document covers two APIs that work together: DbtGraph.load() for graph discovery and build_airflow_graph() for task construction.
Source Location
- Repository: astronomer-cosmos
- DbtGraph class:
cosmos/dbt/graph.pyLines L354-1215 - build_airflow_graph function:
cosmos/airflow/graph.pyLines L817-980
API 1: DbtGraph
Signature
class DbtGraph:
def __init__(
self,
project: DbtProject,
render_config: RenderConfig,
execution_config: ExecutionConfig,
profile_config: ProfileConfig,
cache_dir: Path | None = None,
cache_identifier: str | None = None,
dbt_vars: dict[str, Any] | None = None,
airflow_metadata: dict[str, Any] | None = None,
):
...
def load(
self,
method: LoadMode,
execution_mode: ExecutionMode,
) -> None:
...
Constructor Parameters
| Parameter | Type | Description |
|---|---|---|
project |
DbtProject |
The dbt project definition (paths, name, manifest location). |
render_config |
RenderConfig |
Filtering and rendering configuration (select, exclude, load mode). |
execution_config |
ExecutionConfig |
Execution mode and related settings. |
profile_config |
ProfileConfig |
dbt profile, target, and connection configuration. |
cache_dir |
None | Directory for caching graph loading results. |
cache_identifier |
None | Unique identifier for cache entries. |
dbt_vars |
None | dbt variables passed to graph loading commands. |
airflow_metadata |
None | Metadata from the Airflow environment. |
load() Method
The load() method dispatches to a strategy-specific loader based on the method parameter:
| LoadMode | Strategy | Requirements |
|---|---|---|
AUTOMATIC |
Selects the best available method automatically. | Varies. |
DBT_LS |
Runs dbt ls subprocess to discover nodes. |
dbt installed, database connectivity at parse time. |
DBT_MANIFEST |
Parses a pre-built manifest.json file. |
Manifest file path configured. |
DBT_LS_FILE |
Reads a saved dbt ls output file. |
Output file exists at configured path. |
DBT_LS_CACHE |
Uses cached dbt ls results, refreshing periodically. |
Cache directory configured. |
CUSTOM |
Invokes a user-provided callback. | Callback function configured in RenderConfig. |
Output
After load() completes, the DbtGraph instance is populated with:
nodes-- adict[str, DbtNode]of all discovered nodes keyed by unique ID.filtered_nodes-- adict[str, DbtNode]containing only nodes that pass theRenderConfigfilters.
Each DbtNode contains the node's unique ID, name, resource type, file path, dependencies, and associated configuration.
API 2: build_airflow_graph
Signature
def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG,
task_group: TaskGroup | None = None,
execution_mode: ExecutionMode = ExecutionMode.LOCAL,
task_args: dict[str, Any] = None,
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER,
dbt_project_name: str = "",
on_warning_callback: Callable[..., Any] | None = None,
render_config: RenderConfig = RenderConfig(),
async_py_requirements: list[str] | None = None,
execution_config: ExecutionConfig | None = None,
) -> dict[str, TaskGroup | BaseOperator]:
Parameters
| Parameter | Type | Description |
|---|---|---|
nodes |
dict[str, DbtNode] |
Filtered nodes from DbtGraph.load().
|
dag |
DAG |
The Airflow DAG to attach tasks to. |
task_group |
None | Optional TaskGroup for scoping tasks. |
execution_mode |
ExecutionMode |
Determines which operator class to use for each node type. |
task_args |
dict[str, Any] |
The operator_args dict, broadcast as kwargs to each operator.
|
test_indirect_selection |
TestIndirectSelection |
Controls how tests are selected relative to their parent models. |
dbt_project_name |
str |
Name of the dbt project (used in task IDs). |
on_warning_callback |
None | Callback for dbt test warnings. |
render_config |
RenderConfig |
Rendering configuration (test behavior, node grouping). |
async_py_requirements |
None | Python packages for async execution modes. |
execution_config |
None | Execution configuration for operator construction. |
Output
Returns a dict[str, TaskGroup | BaseOperator] mapping dbt node unique IDs to the corresponding Airflow tasks. All task dependencies are wired before the function returns.
Import
from cosmos.dbt.graph import DbtGraph
from cosmos.airflow.graph import build_airflow_graph
I/O Contract
DbtGraph.load()
- Input:
LoadModeenum andExecutionModeenum. - Side effect: Populates
self.nodesandself.filtered_nodesdictionaries. - Output: None (mutates instance state).
build_airflow_graph()
- Input: Filtered nodes dict, DAG reference, execution mode, task args, and rendering configuration.
- Output: Dict mapping unique IDs to Airflow task objects with all dependencies wired.
Combined Flow
DbtGraphis instantiated with project, profile, execution, and render configs.DbtGraph.load()discovers nodes and populatesfiltered_nodes.build_airflow_graph()takesfiltered_nodesand creates Airflow operators with correct dependencies.
LoadMode Enum
class LoadMode(Enum):
AUTOMATIC = "automatic"
DBT_LS = "dbt_ls"
DBT_MANIFEST = "dbt_manifest"
DBT_LS_FILE = "dbt_ls_file"
DBT_LS_CACHE = "dbt_ls_cache"
CUSTOM = "custom"
Usage Example
These functions are typically called internally by DbtToAirflowConverter. The following shows the conceptual flow:
from cosmos.dbt.graph import DbtGraph
from cosmos.airflow.graph import build_airflow_graph
from cosmos.constants import LoadMode, ExecutionMode
from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from airflow import DAG
from datetime import datetime
from pathlib import Path
# Phase 1: Load the dbt graph
project = ProjectConfig(
dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
)
profile = ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
)
execution = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render = RenderConfig(load_method=LoadMode.DBT_MANIFEST)
dbt_graph = DbtGraph(
project=project,
render_config=render,
execution_config=execution,
profile_config=profile,
)
dbt_graph.load(
method=LoadMode.DBT_MANIFEST,
execution_mode=ExecutionMode.LOCAL,
)
# Phase 2: Build the Airflow task graph
with DAG(
dag_id="jaffle_shop",
schedule="@daily",
start_date=datetime(2024, 1, 1),
) as dag:
tasks = build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
dag=dag,
execution_mode=ExecutionMode.LOCAL,
task_args={"install_deps": True},
dbt_project_name="jaffle_shop",
render_config=render,
execution_config=execution,
)
Note: In practice, you do not need to call these APIs directly. DbtDag and DbtTaskGroup handle both phases automatically. Direct usage is only necessary for advanced customization scenarios.