Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos DbtTaskGroup Init

From Leeroopedia


Overview

Concrete tool for rendering a dbt project as an Airflow TaskGroup provided by the astronomer-cosmos library. DbtTaskGroup is a subclass of both Airflow's TaskGroup and Cosmos's DbtToAirflowConverter, combining native task grouping behavior with automatic dbt graph loading and task generation.

Source Location

  • Repository: astronomer-cosmos
  • DbtTaskGroup class: cosmos/airflow/task_group.py Lines L18-32
  • DbtToAirflowConverter (base logic): cosmos/converter.py Lines L250-459

Signature

class DbtTaskGroup(TaskGroup, DbtToAirflowConverter):
    def __init__(self, group_id: str = "dbt_task_group", *args: Any, **kwargs: Any) -> None:

Key Keyword Arguments (from DbtToAirflowConverter.__init__)

Parameter Type Default Description
group_id str "dbt_task_group" Identifier for the task group. Used as a prefix for all contained task IDs.
project_config ProjectConfig required Configuration for the dbt project (path, manifest path, project name).
profile_config None None dbt profile and target configuration.
execution_config None None Execution mode and related settings.
render_config None None Controls graph rendering: node selection, exclusion, test behavior, load mode.
operator_args None None Keyword arguments broadcast to every generated operator.
on_warning_callback None None Callback invoked when dbt emits warnings during test execution.

Note: Unlike DbtDag, DbtTaskGroup does not accept dag_id or other DAG-level parameters (schedule, start_date, etc.). These are defined on the enclosing DAG instead.

Import

from cosmos import DbtTaskGroup

I/O Contract

Inputs

  • group_id: String identifier for the task group (default: "dbt_task_group").
  • Config objects: ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig.
  • operator_args: Optional dict of kwargs broadcast to every generated operator.

Output

  • An Airflow TaskGroup containing:
    • One operator per dbt node that passes the render configuration filters.
    • Task dependencies wired to match the dbt project's dependency graph.
    • The TaskGroup must be instantiated inside an existing DAG context (either using a with DAG(...) block or the @dag decorator).

Requirements

  • Must be used inside a DAG context. Unlike DbtDag, which creates its own DAG, DbtTaskGroup must be nested within an existing DAG.
  • Task IDs inside the group are automatically prefixed with the group_id, e.g., dbt_task_group.model_customers.

Usage Example

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode

@dag(
    dag_id="elt_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["elt", "dbt"],
)
def elt_pipeline():
    # Pre-dbt task: extract and load raw data
    extract_and_load = EmptyOperator(task_id="extract_and_load")

    # dbt transformations as a TaskGroup
    dbt_transforms = DbtTaskGroup(
        group_id="dbt_transforms",
        project_config=ProjectConfig(
            dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
        ),
        profile_config=ProfileConfig(
            profile_name="jaffle_shop",
            target_name="dev",
            profiles_yml_filepath=Path("/usr/local/airflow/dags/dbt/jaffle_shop/profiles.yml"),
        ),
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.LOCAL,
        ),
        operator_args={
            "install_deps": True,
        },
    )

    # Post-dbt task: run data quality checks
    data_quality_checks = EmptyOperator(task_id="data_quality_checks")

    # Post-dbt task: send notification
    notify = EmptyOperator(task_id="notify_completion")

    # Wire the pipeline: extract -> dbt -> quality checks -> notify
    extract_and_load >> dbt_transforms >> data_quality_checks >> notify

elt_pipeline()

In this example, the dbt transformations are embedded as a collapsible TaskGroup between extraction and quality-check steps. The >> operator wires the entire TaskGroup as a unit -- all root dbt tasks depend on extract_and_load, and data_quality_checks depends on all leaf dbt tasks.

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment