Implementation:Astronomer Astronomer cosmos Cluster Policy
| Knowledge Sources | |
|---|---|
| Domains | Watcher_Execution, Policy |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The Cluster_Policy module provides an Airflow cluster policy hook that automatically routes watcher consumer sensor tasks to a dedicated execution queue.
Description
This module implements an Airflow policy hook using the @hookimpl decorator from airflow.policies. It is registered as an entry point (airflow.policy = cosmos.plugin.cluster_policy) so that Airflow discovers and invokes it automatically for every task instance before execution.
The module contains two functions:
- _is_watcher_sensor determines whether a given task instance is a watcher consumer sensor. It checks if the task's module path (obtained from
_task_modulefor serialized tasks or__class__.__module__as fallback) contains"cosmos.operators.watcher", or if the task is an instance ofBaseConsumerSensor. This dual check ensures compatibility with both serialized and non-serialized task instances, as Airflow 3 serialized tasks do not supportisinstancechecks.
- task_instance_mutation_hook is the policy hook function. When the
watcher_dbt_execution_queuesetting is configured and the task instance is identified as a watcher sensor, it overrides the task'squeueattribute to route it to the designated execution queue. This override only applies on retry attempts (determined by comparingtry_numberagainst a version-specific threshold:>= 2for Airflow 2.x where try_number starts at 1, and>= 1for Airflow 3.x where try_number starts at 0 or None).
Usage
This hook is auto-registered via the airflow.policy entry point and does not require manual invocation. It activates when the watcher_dbt_execution_queue Cosmos setting is configured (typically via the AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE environment variable). When active, it ensures that watcher sensor tasks on retries are routed to a dedicated worker queue, enabling separation of dbt execution workers from the standard Airflow worker pool.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/plugin/cluster_policy.py
Signature
def _is_watcher_sensor(task_instance: TaskInstance) -> bool:
...
@hookimpl
def task_instance_mutation_hook(task_instance: TaskInstance) -> None:
...
Import
# Auto-registered via entry point: airflow.policy = cosmos.plugin.cluster_policy
# Not user-imported. Internal reference only:
from cosmos.plugin.cluster_policy import task_instance_mutation_hook
I/O Contract
Inputs
_is_watcher_sensor
| Name | Type | Required | Description |
|---|---|---|---|
| task_instance | airflow.models.taskinstance.TaskInstance |
Yes | The task instance to check for watcher sensor status |
task_instance_mutation_hook
| Name | Type | Required | Description |
|---|---|---|---|
| task_instance | airflow.models.taskinstance.TaskInstance |
Yes | The task instance to potentially mutate by setting its queue |
Outputs
| Name | Type | Description |
|---|---|---|
| _is_watcher_sensor return | bool |
True if the task instance is a Cosmos watcher consumer sensor |
| task_instance_mutation_hook return | None |
Returns None; mutates task_instance.queue as a side effect when conditions are met
|
Side Effects
| Condition | Effect |
|---|---|
watcher_dbt_execution_queue is set AND task is a watcher sensor AND try_number meets retry threshold |
Sets task_instance.queue to the value of watcher_dbt_execution_queue
|
Usage Examples
# This hook is auto-registered and does not require manual invocation.
# To enable queue routing for watcher sensors, set the environment variable:
#
# export AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE="dbt-workers"
#
# When active, watcher consumer sensor tasks on retries will automatically
# be routed to the "dbt-workers" queue instead of the default queue.
#
# Example of what happens internally:
#
# # Airflow invokes for every task instance before execution:
# task_instance_mutation_hook(task_instance)
# # If task_instance is a DbtRunWatcherKubernetesOperator on retry:
# # task_instance.queue = "dbt-workers"