Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos Watcher Operators

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Execution, Orchestration, Observability
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for producer-consumer dbt execution with per-model monitoring provided by the astronomer-cosmos library.

Description

The watcher operator family implements the producer-consumer execution pattern across multiple source files. DbtProducerWatcherOperator runs a single dbt build and pushes per-node completion status to XCom (using zlib compression and base64 encoding for large payloads). DbtConsumerWatcherSensor instances poll XCom for their individual model's status, with support for deferrable execution via WatcherTrigger. On retries, consumers fall back to running the failed model locally via their inherited DbtRunLocalOperator capability.

Usage

These operators are automatically generated by DbtDag or DbtTaskGroup when ExecutionMode.WATCHER is specified. The producer task is created once per task group, and consumer tasks are created for each dbt node.

Code Reference

Source Location

  • Repository: astronomer-cosmos
  • Files:
    • cosmos/operators/watcher.py -- Lines: L62-284 (DbtProducerWatcherOperator, DbtConsumerWatcherSensor)
    • cosmos/operators/_watcher/base.py -- Lines: L76-296 (BaseConsumerSensor)
    • cosmos/operators/_watcher/triggerer.py -- Lines: L21-166 (WatcherTrigger)
    • cosmos/operators/_watcher/state.py -- Lines: L21-30 (safe_xcom_push, get_xcom_val, build_producer_state_fetcher)

Signature

# Producer: runs the centralized dbt build
class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator):
    """
    Executes a single dbt build covering all nodes,
    pushing per-model status to XCom.
    """
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.base_cmd = ["build"]

    def execute(self, context: Context, **kwargs: Any) -> Any:
        # In DBT_RUNNER mode: registers NodeFinished callback for real-time XCom push
        # In SUBPROCESS mode: parses run_results.json after completion
        ...

# Consumer: monitors a single model's completion
class DbtConsumerWatcherSensor(BaseConsumerSensor, DbtRunLocalOperator):
    """
    Polls XCom for individual model completion status.
    Defers to WatcherTrigger when deferrable=True.
    Falls back to local execution on retry.
    """
    def __init__(
        self,
        *,
        project_dir: str,
        profile_config: ProfileConfig | None = None,
        producer_task_id: str = "run_dbt_build",
        poke_interval: int = 10,
        timeout: int = 3600,
        execution_timeout: timedelta = timedelta(hours=1),
        deferrable: bool = True,
        **kwargs: Any,
    ) -> None:
        ...

# Async trigger: polls XCom from the triggerer process
class WatcherTrigger(BaseTrigger):
    """
    Async trigger that polls XCom for model completion,
    freeing worker slots while waiting.
    """
    def __init__(
        self,
        model_unique_id: str,
        producer_task_id: str,
        dag_id: str,
        run_id: str,
        map_index: int | None = None,
        use_event: bool = True,
        poke_interval: float = 5.0,
    ):
        ...

    async def run(self) -> AsyncIterator[TriggerEvent]:
        # Async polling loop:
        # 1. Check XCom for nodefinished_<unique_id> or <unique_id>_status
        # 2. Check producer task state for early failure detection
        # 3. Yield TriggerEvent with success/model_failed/producer_failed
        ...

Key parameters:

Parameter Type Default Description
producer_task_id str "run_dbt_build" Task ID of the producer operator to poll XCom from
poke_interval int 10 Seconds between XCom polls (sensor mode)
timeout int 3600 Maximum seconds to wait for model completion
execution_timeout timedelta 1 hour Maximum execution time for the sensor task
deferrable bool True Whether to defer to WatcherTrigger for async polling
model_unique_id str -- dbt unique_id for the model this consumer monitors (set by renderer)

Import

from cosmos.operators.watcher import DbtProducerWatcherOperator, DbtConsumerWatcherSensor

I/O Contract

Inputs

Name Type Required Description
project_dir str / Path Yes Path to the dbt project on the worker filesystem
profile_config ProfileConfig Yes Database connection profile configuration
producer_task_id str No Task ID of the producer (defaults to "run_dbt_build")
deferrable bool No Enable async deferral to triggerer process (defaults to True)

Outputs

Component Output Type Description
DbtProducerWatcherOperator XCom entries Per-node status entries keyed by nodefinished_<unique_id>, zlib-compressed and base64-encoded JSON
DbtConsumerWatcherSensor Task success/failure Resolves to success if XCom reports model passed, fails if model failed or producer failed
WatcherTrigger TriggerEvent Yields events with status: success, model_failed, producer_failed, or model_not_run

XCom Data Format

# Producer pushes per-node status as compressed XCom:
import base64, json, zlib

status_data = {"status": "success", "unique_id": "model.jaffle_shop.customers"}
compressed = base64.b64encode(zlib.compress(json.dumps(status_data).encode())).decode()
# XCom key: "nodefinished_model.jaffle_shop.customers"
# XCom value: compressed string

Usage Examples

Watcher Mode via ExecutionConfig (Typical Usage)

from airflow.decorators import dag
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode
from cosmos.profiles import PostgresUserPasswordProfileMapping
import pendulum

@dag(
    start_date=pendulum.datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
)
def jaffle_shop_watcher():
    dbt_tg = DbtTaskGroup(
        project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
        profile_config=ProfileConfig(
            profile_name="jaffle_shop",
            target_name="dev",
            profile_mapping=PostgresUserPasswordProfileMapping(
                conn_id="my_postgres_conn",
            ),
        ),
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.WATCHER,
        ),
        operator_args={
            "deferrable": True,  # Enable async deferral (default)
        },
    )

jaffle_shop_watcher()

In this configuration, Cosmos automatically:

  1. Creates a single DbtProducerWatcherOperator task that runs dbt build
  2. Creates DbtConsumerWatcherSensor tasks for each model/test node
  3. Wires consumer tasks to depend on the producer
  4. Each consumer defers to WatcherTrigger for non-blocking XCom polling

Direct Operator Usage (Advanced)

from cosmos.operators.watcher import DbtProducerWatcherOperator, DbtConsumerWatcherSensor
from cosmos.profiles import PostgresUserPasswordProfileMapping
from datetime import timedelta

profile_config = ProfileConfig(
    profile_name="jaffle_shop",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(conn_id="my_postgres_conn"),
)

producer = DbtProducerWatcherOperator(
    task_id="run_dbt_build",
    project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
    profile_config=profile_config,
)

consumer_customers = DbtConsumerWatcherSensor(
    task_id="watch_customers",
    project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
    profile_config=profile_config,
    producer_task_id="run_dbt_build",
    poke_interval=10,
    timeout=3600,
    deferrable=True,
)

producer >> consumer_customers

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment