Implementation:Astronomer Astronomer cosmos Task Dependency Wiring
| Knowledge Sources | |
|---|---|
| Domains | Orchestration, DAG_Rendering |
| Type | Pattern Doc |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Configuration pattern for wiring dbt task groups with non-dbt operators in Airflow DAGs.
Description
This is a pattern document describing how to use Airflow's built-in dependency operators to connect DbtTaskGroup instances (from astronomer-cosmos) with other Airflow operators. The pattern relies on Airflow core's BaseOperator.__rshift__ and BaseOperator.__lshift__ methods, which are inherited by TaskGroup (and therefore by DbtTaskGroup). No special Cosmos API is needed -- standard Airflow dependency syntax applies directly.
Usage
Apply this pattern whenever a DAG contains both dbt transformation tasks (rendered by Cosmos) and non-dbt tasks (data ingestion, validation, notification, etc.) that must execute in a defined order.
Code Reference
Source Location
- Repository: Airflow core (external)
- Module:
airflow.models.baseoperator.BaseOperator.__rshift__ - Cosmos integration:
cosmos.airflow.task_group.DbtTaskGroupinherits fromairflow.utils.task_group.TaskGroup
Interface Specification
# Standard Airflow dependency operators
pre_task >> dbt_task_group >> post_task
# Or equivalently using explicit methods
dbt_task_group.set_upstream(pre_task)
dbt_task_group.set_downstream(post_task)
# Multiple dependencies
[task_a, task_b] >> dbt_task_group >> [task_c, task_d]
# Chaining multiple task groups
extract >> staging_group >> marts_group >> notify
Import
# No special import needed -- built-in Airflow syntax
# DbtTaskGroup inherits dependency methods from TaskGroup
from cosmos import DbtTaskGroup # Provides >> and << operators natively
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| upstream | BaseOperator / TaskGroup / list[BaseOperator] | No | Task(s) that must complete before the dbt group starts |
| dbt_task_group | DbtTaskGroup | Yes | The Cosmos-rendered dbt task group |
| downstream | BaseOperator / TaskGroup / list[BaseOperator] | No | Task(s) that run after the dbt group completes |
Outputs
| Name | Type | Description |
|---|---|---|
| DAG graph | Airflow DAG | The complete DAG with wired execution order between all tasks |
Behavior
| Expression | Effect |
|---|---|
pre >> dbt_group |
pre must finish before any task in dbt_group starts (wires to group root tasks) |
dbt_group >> post |
All tasks in dbt_group must finish before post starts (wires from group leaf tasks) |
pre >> dbt_group >> post |
Full sandwich: pre -> dbt subgraph -> post |
[a, b] >> dbt_group |
Both a and b must finish before dbt_group starts |
dbt_group >> [c, d] |
Both c and d start after dbt_group completes |
group_a >> group_b |
All leaves of group_a wire to all roots of group_b |
Usage Examples
Complete DAG: Extract, Transform, Notify
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
import pendulum
@dag(
start_date=pendulum.datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
tags=["jaffle_shop"],
)
def jaffle_shop_pipeline():
# Pre-dbt: data extraction
extract_data = EmptyOperator(task_id="extract_data")
# dbt transformation via Cosmos
dbt_transform = DbtTaskGroup(
group_id="dbt_transform",
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="my_postgres_conn",
),
),
)
# Post-dbt: notification
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_conn",
message="dbt transformation complete!",
)
# Wire the dependency chain
extract_data >> dbt_transform >> notify_success
jaffle_shop_pipeline()
Multiple DbtTaskGroups with Selective Rendering
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
import pendulum
@dag(
start_date=pendulum.datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
)
def multi_stage_pipeline():
pre_task = EmptyOperator(task_id="start")
profile_config = ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="my_postgres_conn",
),
)
# Stage 1: Seed loading
seeds = DbtTaskGroup(
group_id="seeds",
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
profile_config=profile_config,
render_config=RenderConfig(select=["path:seeds"]),
)
# Stage 2: Staging models
staging = DbtTaskGroup(
group_id="staging",
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
profile_config=profile_config,
render_config=RenderConfig(select=["path:models/staging"]),
)
# Stage 3: Mart models
marts = DbtTaskGroup(
group_id="marts",
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
profile_config=profile_config,
render_config=RenderConfig(select=["path:models/marts"]),
)
post_task = EmptyOperator(task_id="end")
# Chain all stages in order
pre_task >> seeds >> staging >> marts >> post_task
multi_stage_pipeline()
Fan-Out / Fan-In Pattern
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
import pendulum
@dag(
start_date=pendulum.datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
)
def fan_out_pipeline():
start = EmptyOperator(task_id="start")
profile_config = ProfileConfig(
profile_name="analytics",
target_name="prod",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_conn",
),
)
# Two independent dbt task groups (fan-out)
customers_group = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig("/dags/dbt/analytics"),
profile_config=profile_config,
render_config=RenderConfig(select=["customers"]),
)
orders_group = DbtTaskGroup(
group_id="orders",
project_config=ProjectConfig("/dags/dbt/analytics"),
profile_config=profile_config,
render_config=RenderConfig(select=["orders"]),
)
# Fan-in: both groups must complete before reporting
generate_report = EmptyOperator(task_id="generate_report")
# Wire fan-out and fan-in
start >> [customers_group, orders_group] >> generate_report
fan_out_pipeline()