Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos Task Dependency Wiring

From Leeroopedia


Knowledge Sources
Domains Orchestration, DAG_Rendering
Type Pattern Doc
Last Updated 2026-02-07 00:00 GMT

Overview

Configuration pattern for wiring dbt task groups with non-dbt operators in Airflow DAGs.

Description

This is a pattern document describing how to use Airflow's built-in dependency operators to connect DbtTaskGroup instances (from astronomer-cosmos) with other Airflow operators. The pattern relies on Airflow core's BaseOperator.__rshift__ and BaseOperator.__lshift__ methods, which are inherited by TaskGroup (and therefore by DbtTaskGroup). No special Cosmos API is needed -- standard Airflow dependency syntax applies directly.

Usage

Apply this pattern whenever a DAG contains both dbt transformation tasks (rendered by Cosmos) and non-dbt tasks (data ingestion, validation, notification, etc.) that must execute in a defined order.

Code Reference

Source Location

  • Repository: Airflow core (external)
  • Module: airflow.models.baseoperator.BaseOperator.__rshift__
  • Cosmos integration: cosmos.airflow.task_group.DbtTaskGroup inherits from airflow.utils.task_group.TaskGroup

Interface Specification

# Standard Airflow dependency operators
pre_task >> dbt_task_group >> post_task

# Or equivalently using explicit methods
dbt_task_group.set_upstream(pre_task)
dbt_task_group.set_downstream(post_task)

# Multiple dependencies
[task_a, task_b] >> dbt_task_group >> [task_c, task_d]

# Chaining multiple task groups
extract >> staging_group >> marts_group >> notify

Import

# No special import needed -- built-in Airflow syntax
# DbtTaskGroup inherits dependency methods from TaskGroup
from cosmos import DbtTaskGroup  # Provides >> and << operators natively

I/O Contract

Inputs

Name Type Required Description
upstream BaseOperator / TaskGroup / list[BaseOperator] No Task(s) that must complete before the dbt group starts
dbt_task_group DbtTaskGroup Yes The Cosmos-rendered dbt task group
downstream BaseOperator / TaskGroup / list[BaseOperator] No Task(s) that run after the dbt group completes

Outputs

Name Type Description
DAG graph Airflow DAG The complete DAG with wired execution order between all tasks

Behavior

Expression Effect
pre >> dbt_group pre must finish before any task in dbt_group starts (wires to group root tasks)
dbt_group >> post All tasks in dbt_group must finish before post starts (wires from group leaf tasks)
pre >> dbt_group >> post Full sandwich: pre -> dbt subgraph -> post
[a, b] >> dbt_group Both a and b must finish before dbt_group starts
dbt_group >> [c, d] Both c and d start after dbt_group completes
group_a >> group_b All leaves of group_a wire to all roots of group_b

Usage Examples

Complete DAG: Extract, Transform, Notify

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
import pendulum

@dag(
    start_date=pendulum.datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["jaffle_shop"],
)
def jaffle_shop_pipeline():
    # Pre-dbt: data extraction
    extract_data = EmptyOperator(task_id="extract_data")

    # dbt transformation via Cosmos
    dbt_transform = DbtTaskGroup(
        group_id="dbt_transform",
        project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
        profile_config=ProfileConfig(
            profile_name="jaffle_shop",
            target_name="dev",
            profile_mapping=PostgresUserPasswordProfileMapping(
                conn_id="my_postgres_conn",
            ),
        ),
    )

    # Post-dbt: notification
    notify_success = SlackWebhookOperator(
        task_id="notify_success",
        slack_webhook_conn_id="slack_conn",
        message="dbt transformation complete!",
    )

    # Wire the dependency chain
    extract_data >> dbt_transform >> notify_success

jaffle_shop_pipeline()

Multiple DbtTaskGroups with Selective Rendering

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
import pendulum

@dag(
    start_date=pendulum.datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
)
def multi_stage_pipeline():
    pre_task = EmptyOperator(task_id="start")

    profile_config = ProfileConfig(
        profile_name="jaffle_shop",
        target_name="dev",
        profile_mapping=PostgresUserPasswordProfileMapping(
            conn_id="my_postgres_conn",
        ),
    )

    # Stage 1: Seed loading
    seeds = DbtTaskGroup(
        group_id="seeds",
        project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
        profile_config=profile_config,
        render_config=RenderConfig(select=["path:seeds"]),
    )

    # Stage 2: Staging models
    staging = DbtTaskGroup(
        group_id="staging",
        project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
        profile_config=profile_config,
        render_config=RenderConfig(select=["path:models/staging"]),
    )

    # Stage 3: Mart models
    marts = DbtTaskGroup(
        group_id="marts",
        project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
        profile_config=profile_config,
        render_config=RenderConfig(select=["path:models/marts"]),
    )

    post_task = EmptyOperator(task_id="end")

    # Chain all stages in order
    pre_task >> seeds >> staging >> marts >> post_task

multi_stage_pipeline()

Fan-Out / Fan-In Pattern

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
import pendulum

@dag(
    start_date=pendulum.datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
)
def fan_out_pipeline():
    start = EmptyOperator(task_id="start")

    profile_config = ProfileConfig(
        profile_name="analytics",
        target_name="prod",
        profile_mapping=SnowflakeUserPasswordProfileMapping(
            conn_id="snowflake_conn",
        ),
    )

    # Two independent dbt task groups (fan-out)
    customers_group = DbtTaskGroup(
        group_id="customers",
        project_config=ProjectConfig("/dags/dbt/analytics"),
        profile_config=profile_config,
        render_config=RenderConfig(select=["customers"]),
    )

    orders_group = DbtTaskGroup(
        group_id="orders",
        project_config=ProjectConfig("/dags/dbt/analytics"),
        profile_config=profile_config,
        render_config=RenderConfig(select=["orders"]),
    )

    # Fan-in: both groups must complete before reporting
    generate_report = EmptyOperator(task_id="generate_report")

    # Wire fan-out and fan-in
    start >> [customers_group, orders_group] >> generate_report

fan_out_pipeline()

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment