Implementation:Astronomer Astronomer cosmos DbtDag Init
Appearance
Overview
Concrete tool for rendering a dbt project as an Airflow DAG provided by the astronomer-cosmos library. DbtDag is a subclass of both Airflow's DAG and Cosmos's DbtToAirflowConverter, combining native DAG behavior with automatic dbt graph loading and task generation.
Source Location
- Repository: astronomer-cosmos
- DbtDag class:
cosmos/airflow/dag.pyLines L14-26 - DbtToAirflowConverter (base logic):
cosmos/converter.pyLines L250-459
Signature
class DbtDag(DAG, DbtToAirflowConverter):
def __init__(self, *args: Any, **kwargs: Any) -> None:
Key Keyword Arguments (from DbtToAirflowConverter.__init__)
| Parameter | Type | Default | Description |
|---|---|---|---|
project_config |
ProjectConfig |
required | Configuration for the dbt project (path, manifest path, project name). |
profile_config |
None | None |
dbt profile and target configuration. Required for most execution modes. |
execution_config |
None | None |
Execution mode and related settings (local, docker, kubernetes, etc.). |
render_config |
None | None |
Controls graph rendering: node selection, exclusion, test behavior, load mode. |
operator_args |
None | None |
Keyword arguments broadcast to every generated operator. |
on_warning_callback |
None | None |
Callback invoked when dbt emits warnings during test execution. |
Standard Airflow DAG Keyword Arguments
All standard Airflow DAG kwargs are also accepted, including:
dag_id(str) -- unique identifier for the DAG.schedule(str | timedelta | Timetable | None) -- scheduling interval.start_date(datetime) -- earliest date for DAG runs.catchup(bool) -- whether to backfill missed runs.default_args(dict) -- default arguments for all tasks.tags(list[str]) -- tags for filtering in the Airflow UI.max_active_runs(int) -- concurrency limit for DAG runs.
Import
from cosmos import DbtDag
I/O Contract
Inputs
- Config objects:
ProjectConfig,ProfileConfig,ExecutionConfig,RenderConfigdefine what to render and how. - Airflow DAG kwargs:
dag_id,schedule,start_date, etc. define the DAG's scheduling behavior. - operator_args: Optional dict of kwargs broadcast to every generated operator.
Output
- A fully rendered Airflow DAG containing:
- One operator per dbt node (model, test, seed, snapshot) that passes the render configuration filters.
- Task dependencies wired to match the dbt project's dependency graph.
- The DAG is immediately registered with Airflow when instantiated at module scope.
Lifecycle
DbtDag.__init__()callsDAG.__init__()to set up the Airflow DAG.- It then calls
DbtToAirflowConverter.__init__(), which:- Creates a
DbtGraphand callsload()to discover dbt nodes. - Calls
build_airflow_graph()to create operators and wire dependencies.
- Creates a
- The result is a ready-to-schedule DAG with all tasks in place.
Usage Example
from datetime import datetime
from pathlib import Path
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode, LoadMode
jaffle_shop = DbtDag(
# Airflow DAG parameters
dag_id="jaffle_shop",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["dbt", "jaffle_shop"],
# Cosmos configuration
project_config=ProjectConfig(
dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profiles_yml_filepath=Path("/usr/local/airflow/dags/dbt/jaffle_shop/profiles.yml"),
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
),
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
),
operator_args={
"install_deps": True,
},
)
This single module-level instantiation produces a complete Airflow DAG with one task per dbt model/test/seed, scheduled to run daily.
Related Pages
Implements Principle
Requires Environment
Uses Heuristic
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment