Implementation:Astronomer Astronomer cosmos Watcher Operators
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Execution, Orchestration, Observability |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for producer-consumer dbt execution with per-model monitoring provided by the astronomer-cosmos library.
Description
The watcher operator family implements the producer-consumer execution pattern across multiple source files. DbtProducerWatcherOperator runs a single dbt build and pushes per-node completion status to XCom (using zlib compression and base64 encoding for large payloads). DbtConsumerWatcherSensor instances poll XCom for their individual model's status, with support for deferrable execution via WatcherTrigger. On retries, consumers fall back to running the failed model locally via their inherited DbtRunLocalOperator capability.
Usage
These operators are automatically generated by DbtDag or DbtTaskGroup when ExecutionMode.WATCHER is specified. The producer task is created once per task group, and consumer tasks are created for each dbt node.
Code Reference
Source Location
- Repository: astronomer-cosmos
- Files:
- cosmos/operators/watcher.py -- Lines: L62-284 (DbtProducerWatcherOperator, DbtConsumerWatcherSensor)
- cosmos/operators/_watcher/base.py -- Lines: L76-296 (BaseConsumerSensor)
- cosmos/operators/_watcher/triggerer.py -- Lines: L21-166 (WatcherTrigger)
- cosmos/operators/_watcher/state.py -- Lines: L21-30 (safe_xcom_push, get_xcom_val, build_producer_state_fetcher)
Signature
# Producer: runs the centralized dbt build
class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator):
"""
Executes a single dbt build covering all nodes,
pushing per-model status to XCom.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.base_cmd = ["build"]
def execute(self, context: Context, **kwargs: Any) -> Any:
# In DBT_RUNNER mode: registers NodeFinished callback for real-time XCom push
# In SUBPROCESS mode: parses run_results.json after completion
...
# Consumer: monitors a single model's completion
class DbtConsumerWatcherSensor(BaseConsumerSensor, DbtRunLocalOperator):
"""
Polls XCom for individual model completion status.
Defers to WatcherTrigger when deferrable=True.
Falls back to local execution on retry.
"""
def __init__(
self,
*,
project_dir: str,
profile_config: ProfileConfig | None = None,
producer_task_id: str = "run_dbt_build",
poke_interval: int = 10,
timeout: int = 3600,
execution_timeout: timedelta = timedelta(hours=1),
deferrable: bool = True,
**kwargs: Any,
) -> None:
...
# Async trigger: polls XCom from the triggerer process
class WatcherTrigger(BaseTrigger):
"""
Async trigger that polls XCom for model completion,
freeing worker slots while waiting.
"""
def __init__(
self,
model_unique_id: str,
producer_task_id: str,
dag_id: str,
run_id: str,
map_index: int | None = None,
use_event: bool = True,
poke_interval: float = 5.0,
):
...
async def run(self) -> AsyncIterator[TriggerEvent]:
# Async polling loop:
# 1. Check XCom for nodefinished_<unique_id> or <unique_id>_status
# 2. Check producer task state for early failure detection
# 3. Yield TriggerEvent with success/model_failed/producer_failed
...
Key parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
| producer_task_id | str | "run_dbt_build" | Task ID of the producer operator to poll XCom from |
| poke_interval | int | 10 | Seconds between XCom polls (sensor mode) |
| timeout | int | 3600 | Maximum seconds to wait for model completion |
| execution_timeout | timedelta | 1 hour | Maximum execution time for the sensor task |
| deferrable | bool | True | Whether to defer to WatcherTrigger for async polling |
| model_unique_id | str | -- | dbt unique_id for the model this consumer monitors (set by renderer) |
Import
from cosmos.operators.watcher import DbtProducerWatcherOperator, DbtConsumerWatcherSensor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| project_dir | str / Path | Yes | Path to the dbt project on the worker filesystem |
| profile_config | ProfileConfig | Yes | Database connection profile configuration |
| producer_task_id | str | No | Task ID of the producer (defaults to "run_dbt_build") |
| deferrable | bool | No | Enable async deferral to triggerer process (defaults to True) |
Outputs
| Component | Output Type | Description |
|---|---|---|
| DbtProducerWatcherOperator | XCom entries | Per-node status entries keyed by nodefinished_<unique_id>, zlib-compressed and base64-encoded JSON
|
| DbtConsumerWatcherSensor | Task success/failure | Resolves to success if XCom reports model passed, fails if model failed or producer failed |
| WatcherTrigger | TriggerEvent | Yields events with status: success, model_failed, producer_failed, or model_not_run
|
XCom Data Format
# Producer pushes per-node status as compressed XCom:
import base64, json, zlib
status_data = {"status": "success", "unique_id": "model.jaffle_shop.customers"}
compressed = base64.b64encode(zlib.compress(json.dumps(status_data).encode())).decode()
# XCom key: "nodefinished_model.jaffle_shop.customers"
# XCom value: compressed string
Usage Examples
Watcher Mode via ExecutionConfig (Typical Usage)
from airflow.decorators import dag
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode
from cosmos.profiles import PostgresUserPasswordProfileMapping
import pendulum
@dag(
start_date=pendulum.datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
)
def jaffle_shop_watcher():
dbt_tg = DbtTaskGroup(
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="my_postgres_conn",
),
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER,
),
operator_args={
"deferrable": True, # Enable async deferral (default)
},
)
jaffle_shop_watcher()
In this configuration, Cosmos automatically:
- Creates a single DbtProducerWatcherOperator task that runs
dbt build - Creates DbtConsumerWatcherSensor tasks for each model/test node
- Wires consumer tasks to depend on the producer
- Each consumer defers to WatcherTrigger for non-blocking XCom polling
Direct Operator Usage (Advanced)
from cosmos.operators.watcher import DbtProducerWatcherOperator, DbtConsumerWatcherSensor
from cosmos.profiles import PostgresUserPasswordProfileMapping
from datetime import timedelta
profile_config = ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="my_postgres_conn"),
)
producer = DbtProducerWatcherOperator(
task_id="run_dbt_build",
project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
profile_config=profile_config,
)
consumer_customers = DbtConsumerWatcherSensor(
task_id="watch_customers",
project_dir="/usr/local/airflow/dags/dbt/jaffle_shop",
profile_config=profile_config,
producer_task_id="run_dbt_build",
poke_interval=10,
timeout=3600,
deferrable=True,
)
producer >> consumer_customers