Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos Cluster Policy

From Leeroopedia


Knowledge Sources
Domains Watcher_Execution, Policy
Last Updated 2026-02-07 17:00 GMT

Overview

The Cluster_Policy module provides an Airflow cluster policy hook that automatically routes watcher consumer sensor tasks to a dedicated execution queue.

Description

This module implements an Airflow policy hook using the @hookimpl decorator from airflow.policies. It is registered as an entry point (airflow.policy = cosmos.plugin.cluster_policy) so that Airflow discovers and invokes it automatically for every task instance before execution.

The module contains two functions:

  • _is_watcher_sensor determines whether a given task instance is a watcher consumer sensor. It checks if the task's module path (obtained from _task_module for serialized tasks or __class__.__module__ as fallback) contains "cosmos.operators.watcher", or if the task is an instance of BaseConsumerSensor. This dual check ensures compatibility with both serialized and non-serialized task instances, as Airflow 3 serialized tasks do not support isinstance checks.
  • task_instance_mutation_hook is the policy hook function. When the watcher_dbt_execution_queue setting is configured and the task instance is identified as a watcher sensor, it overrides the task's queue attribute to route it to the designated execution queue. This override only applies on retry attempts (determined by comparing try_number against a version-specific threshold: >= 2 for Airflow 2.x where try_number starts at 1, and >= 1 for Airflow 3.x where try_number starts at 0 or None).

Usage

This hook is auto-registered via the airflow.policy entry point and does not require manual invocation. It activates when the watcher_dbt_execution_queue Cosmos setting is configured (typically via the AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE environment variable). When active, it ensures that watcher sensor tasks on retries are routed to a dedicated worker queue, enabling separation of dbt execution workers from the standard Airflow worker pool.

Code Reference

Source Location

Signature

def _is_watcher_sensor(task_instance: TaskInstance) -> bool:
    ...

@hookimpl
def task_instance_mutation_hook(task_instance: TaskInstance) -> None:
    ...

Import

# Auto-registered via entry point: airflow.policy = cosmos.plugin.cluster_policy
# Not user-imported. Internal reference only:
from cosmos.plugin.cluster_policy import task_instance_mutation_hook

I/O Contract

Inputs

_is_watcher_sensor

Name Type Required Description
task_instance airflow.models.taskinstance.TaskInstance Yes The task instance to check for watcher sensor status

task_instance_mutation_hook

Name Type Required Description
task_instance airflow.models.taskinstance.TaskInstance Yes The task instance to potentially mutate by setting its queue

Outputs

Name Type Description
_is_watcher_sensor return bool True if the task instance is a Cosmos watcher consumer sensor
task_instance_mutation_hook return None Returns None; mutates task_instance.queue as a side effect when conditions are met

Side Effects

Condition Effect
watcher_dbt_execution_queue is set AND task is a watcher sensor AND try_number meets retry threshold Sets task_instance.queue to the value of watcher_dbt_execution_queue

Usage Examples

# This hook is auto-registered and does not require manual invocation.
# To enable queue routing for watcher sensors, set the environment variable:
#
#   export AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE="dbt-workers"
#
# When active, watcher consumer sensor tasks on retries will automatically
# be routed to the "dbt-workers" queue instead of the default queue.
#
# Example of what happens internally:
#
#   # Airflow invokes for every task instance before execution:
#   task_instance_mutation_hook(task_instance)
#   # If task_instance is a DbtRunWatcherKubernetesOperator on retry:
#   #   task_instance.queue = "dbt-workers"

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment