Implementation:Astronomer Astronomer cosmos DbtTaskGroup Init
Overview
Concrete tool for rendering a dbt project as an Airflow TaskGroup provided by the astronomer-cosmos library. DbtTaskGroup is a subclass of both Airflow's TaskGroup and Cosmos's DbtToAirflowConverter, combining native task grouping behavior with automatic dbt graph loading and task generation.
Source Location
- Repository: astronomer-cosmos
- DbtTaskGroup class:
cosmos/airflow/task_group.pyLines L18-32 - DbtToAirflowConverter (base logic):
cosmos/converter.pyLines L250-459
Signature
class DbtTaskGroup(TaskGroup, DbtToAirflowConverter):
def __init__(self, group_id: str = "dbt_task_group", *args: Any, **kwargs: Any) -> None:
Key Keyword Arguments (from DbtToAirflowConverter.__init__)
| Parameter | Type | Default | Description |
|---|---|---|---|
group_id |
str |
"dbt_task_group" |
Identifier for the task group. Used as a prefix for all contained task IDs. |
project_config |
ProjectConfig |
required | Configuration for the dbt project (path, manifest path, project name). |
profile_config |
None | None |
dbt profile and target configuration. |
execution_config |
None | None |
Execution mode and related settings. |
render_config |
None | None |
Controls graph rendering: node selection, exclusion, test behavior, load mode. |
operator_args |
None | None |
Keyword arguments broadcast to every generated operator. |
on_warning_callback |
None | None |
Callback invoked when dbt emits warnings during test execution. |
Note: Unlike DbtDag, DbtTaskGroup does not accept dag_id or other DAG-level parameters (schedule, start_date, etc.). These are defined on the enclosing DAG instead.
Import
from cosmos import DbtTaskGroup
I/O Contract
Inputs
- group_id: String identifier for the task group (default:
"dbt_task_group"). - Config objects:
ProjectConfig,ProfileConfig,ExecutionConfig,RenderConfig. - operator_args: Optional dict of kwargs broadcast to every generated operator.
Output
- An Airflow TaskGroup containing:
- One operator per dbt node that passes the render configuration filters.
- Task dependencies wired to match the dbt project's dependency graph.
- The TaskGroup must be instantiated inside an existing DAG context (either using a
with DAG(...)block or the@dagdecorator).
Requirements
- Must be used inside a DAG context. Unlike
DbtDag, which creates its own DAG,DbtTaskGroupmust be nested within an existing DAG. - Task IDs inside the group are automatically prefixed with the
group_id, e.g.,dbt_task_group.model_customers.
Usage Example
from datetime import datetime
from pathlib import Path
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode
@dag(
dag_id="elt_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["elt", "dbt"],
)
def elt_pipeline():
# Pre-dbt task: extract and load raw data
extract_and_load = EmptyOperator(task_id="extract_and_load")
# dbt transformations as a TaskGroup
dbt_transforms = DbtTaskGroup(
group_id="dbt_transforms",
project_config=ProjectConfig(
dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profiles_yml_filepath=Path("/usr/local/airflow/dags/dbt/jaffle_shop/profiles.yml"),
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
),
operator_args={
"install_deps": True,
},
)
# Post-dbt task: run data quality checks
data_quality_checks = EmptyOperator(task_id="data_quality_checks")
# Post-dbt task: send notification
notify = EmptyOperator(task_id="notify_completion")
# Wire the pipeline: extract -> dbt -> quality checks -> notify
extract_and_load >> dbt_transforms >> data_quality_checks >> notify
elt_pipeline()
In this example, the dbt transformations are embedded as a collapsible TaskGroup between extraction and quality-check steps. The >> operator wires the entire TaskGroup as a unit -- all root dbt tasks depend on extract_and_load, and data_quality_checks depends on all leaf dbt tasks.