Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos DbtGraph Load And Build

From Leeroopedia


Overview

Concrete tool for loading dbt project graphs and building Airflow task structures provided by the astronomer-cosmos library. This document covers two APIs that work together: DbtGraph.load() for graph discovery and build_airflow_graph() for task construction.

Source Location

  • Repository: astronomer-cosmos
  • DbtGraph class: cosmos/dbt/graph.py Lines L354-1215
  • build_airflow_graph function: cosmos/airflow/graph.py Lines L817-980

API 1: DbtGraph

Signature

class DbtGraph:
    def __init__(
        self,
        project: DbtProject,
        render_config: RenderConfig,
        execution_config: ExecutionConfig,
        profile_config: ProfileConfig,
        cache_dir: Path | None = None,
        cache_identifier: str | None = None,
        dbt_vars: dict[str, Any] | None = None,
        airflow_metadata: dict[str, Any] | None = None,
    ):
        ...

    def load(
        self,
        method: LoadMode,
        execution_mode: ExecutionMode,
    ) -> None:
        ...

Constructor Parameters

Parameter Type Description
project DbtProject The dbt project definition (paths, name, manifest location).
render_config RenderConfig Filtering and rendering configuration (select, exclude, load mode).
execution_config ExecutionConfig Execution mode and related settings.
profile_config ProfileConfig dbt profile, target, and connection configuration.
cache_dir None Directory for caching graph loading results.
cache_identifier None Unique identifier for cache entries.
dbt_vars None dbt variables passed to graph loading commands.
airflow_metadata None Metadata from the Airflow environment.

load() Method

The load() method dispatches to a strategy-specific loader based on the method parameter:

LoadMode Strategy Requirements
AUTOMATIC Selects the best available method automatically. Varies.
DBT_LS Runs dbt ls subprocess to discover nodes. dbt installed, database connectivity at parse time.
DBT_MANIFEST Parses a pre-built manifest.json file. Manifest file path configured.
DBT_LS_FILE Reads a saved dbt ls output file. Output file exists at configured path.
DBT_LS_CACHE Uses cached dbt ls results, refreshing periodically. Cache directory configured.
CUSTOM Invokes a user-provided callback. Callback function configured in RenderConfig.

Output

After load() completes, the DbtGraph instance is populated with:

  • nodes -- a dict[str, DbtNode] of all discovered nodes keyed by unique ID.
  • filtered_nodes -- a dict[str, DbtNode] containing only nodes that pass the RenderConfig filters.

Each DbtNode contains the node's unique ID, name, resource type, file path, dependencies, and associated configuration.

API 2: build_airflow_graph

Signature

def build_airflow_graph(
    nodes: dict[str, DbtNode],
    dag: DAG,
    task_group: TaskGroup | None = None,
    execution_mode: ExecutionMode = ExecutionMode.LOCAL,
    task_args: dict[str, Any] = None,
    test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER,
    dbt_project_name: str = "",
    on_warning_callback: Callable[..., Any] | None = None,
    render_config: RenderConfig = RenderConfig(),
    async_py_requirements: list[str] | None = None,
    execution_config: ExecutionConfig | None = None,
) -> dict[str, TaskGroup | BaseOperator]:

Parameters

Parameter Type Description
nodes dict[str, DbtNode] Filtered nodes from DbtGraph.load().
dag DAG The Airflow DAG to attach tasks to.
task_group None Optional TaskGroup for scoping tasks.
execution_mode ExecutionMode Determines which operator class to use for each node type.
task_args dict[str, Any] The operator_args dict, broadcast as kwargs to each operator.
test_indirect_selection TestIndirectSelection Controls how tests are selected relative to their parent models.
dbt_project_name str Name of the dbt project (used in task IDs).
on_warning_callback None Callback for dbt test warnings.
render_config RenderConfig Rendering configuration (test behavior, node grouping).
async_py_requirements None Python packages for async execution modes.
execution_config None Execution configuration for operator construction.

Output

Returns a dict[str, TaskGroup | BaseOperator] mapping dbt node unique IDs to the corresponding Airflow tasks. All task dependencies are wired before the function returns.

Import

from cosmos.dbt.graph import DbtGraph
from cosmos.airflow.graph import build_airflow_graph

I/O Contract

DbtGraph.load()

  • Input: LoadMode enum and ExecutionMode enum.
  • Side effect: Populates self.nodes and self.filtered_nodes dictionaries.
  • Output: None (mutates instance state).

build_airflow_graph()

  • Input: Filtered nodes dict, DAG reference, execution mode, task args, and rendering configuration.
  • Output: Dict mapping unique IDs to Airflow task objects with all dependencies wired.

Combined Flow

  1. DbtGraph is instantiated with project, profile, execution, and render configs.
  2. DbtGraph.load() discovers nodes and populates filtered_nodes.
  3. build_airflow_graph() takes filtered_nodes and creates Airflow operators with correct dependencies.

LoadMode Enum

class LoadMode(Enum):
    AUTOMATIC = "automatic"
    DBT_LS = "dbt_ls"
    DBT_MANIFEST = "dbt_manifest"
    DBT_LS_FILE = "dbt_ls_file"
    DBT_LS_CACHE = "dbt_ls_cache"
    CUSTOM = "custom"

Usage Example

These functions are typically called internally by DbtToAirflowConverter. The following shows the conceptual flow:

from cosmos.dbt.graph import DbtGraph
from cosmos.airflow.graph import build_airflow_graph
from cosmos.constants import LoadMode, ExecutionMode
from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from airflow import DAG
from datetime import datetime
from pathlib import Path

# Phase 1: Load the dbt graph
project = ProjectConfig(
    dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
)
profile = ProfileConfig(
    profile_name="jaffle_shop",
    target_name="dev",
)
execution = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render = RenderConfig(load_method=LoadMode.DBT_MANIFEST)

dbt_graph = DbtGraph(
    project=project,
    render_config=render,
    execution_config=execution,
    profile_config=profile,
)
dbt_graph.load(
    method=LoadMode.DBT_MANIFEST,
    execution_mode=ExecutionMode.LOCAL,
)

# Phase 2: Build the Airflow task graph
with DAG(
    dag_id="jaffle_shop",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
) as dag:
    tasks = build_airflow_graph(
        nodes=dbt_graph.filtered_nodes,
        dag=dag,
        execution_mode=ExecutionMode.LOCAL,
        task_args={"install_deps": True},
        dbt_project_name="jaffle_shop",
        render_config=render,
        execution_config=execution,
    )

Note: In practice, you do not need to call these APIs directly. DbtDag and DbtTaskGroup handle both phases automatically. Direct usage is only necessary for advanced customization scenarios.

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment