Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Astronomer Astronomer cosmos Watcher Producer Consumer Execution

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Execution, Orchestration, Observability
Last Updated 2026-02-07 00:00 GMT

Overview

An execution principle implementing a producer-consumer pattern where a single dbt build command is monitored by per-model sensor tasks for granular observability within the Airflow DAG.

Description

Traditional per-model execution in Cosmos (the default local or Kubernetes mode) creates N subprocesses or pods -- one for each dbt model. While this provides excellent Airflow-native observability (each model is its own task), it introduces significant overhead: subprocess/pod startup time, repeated dbt parsing, and redundant dependency installation.

Watcher mode takes a fundamentally different approach:

  • The Producer -- A single DbtProducerWatcherOperator runs dbt build covering all selected nodes. This lets dbt's internal scheduler handle parallelism and dependency resolution optimally. The producer pushes per-node completion status to XCom in real-time.
  • The Consumers -- N lightweight DbtConsumerWatcherSensor instances (one per model) monitor their individual model's completion status by polling XCom. Each consumer resolves to success or failure based on the XCom data from the producer.

This architecture achieves up to 5x speedup over per-model execution by:

  • Eliminating repeated dbt project parsing (single parse for all nodes)
  • Letting dbt manage thread-level parallelism internally
  • Avoiding subprocess/pod startup overhead for each node
  • Reducing the total number of dbt invocations from N to 1

When deferrable=True (the default), consumers defer to the Airflow triggerer process for async polling. This means consumer tasks do not occupy worker slots while waiting for the producer to finish their model -- only the triggerer process polls XCom in the background.

Usage

Use watcher execution mode when:

  • Performance is a priority and per-model subprocess overhead is unacceptable
  • Per-model visibility is still required in the Airflow UI (unlike a single monolithic dbt build task)
  • Deferrable operators are available (Airflow 2.6+ with triggerer running)
  • dbt's internal scheduler should manage parallelism rather than Airflow's task scheduler
from cosmos import ExecutionConfig, ExecutionMode

execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,
)

Theoretical Basis

The watcher pattern applies the producer-consumer architectural pattern to dbt build monitoring:

Producer Side

The producer (DbtProducerWatcherOperator) executes a single dbt build and pushes per-node status to XCom. The mechanism differs by invocation mode:

  • DBT_RUNNER mode -- Uses dbt's programmatic API with a callback registered for NodeFinished events. Each time a model/test completes, the callback immediately pushes the node's status to XCom in real-time. This provides low-latency status updates.
  • SUBPROCESS mode -- After the full dbt build completes, the producer parses run_results.json and pushes all node statuses to XCom in batch.

XCom payloads are zlib-compressed and base64-encoded JSON to minimize storage overhead when dealing with large dbt projects (potentially hundreds of nodes).

Consumer Side

Each consumer (DbtConsumerWatcherSensor) polls XCom for an entry keyed by its model's unique_id:

  1. The sensor checks XCom for a key matching nodefinished_<unique_id> or <unique_id>_status
  2. If found, it decodes the compressed payload and checks the status (success/fail/warn)
  3. If not found, it either re-pokes after poke_interval seconds or defers to the WatcherTrigger

Deferral Mechanism

When deferrable=True, the consumer sensor defers execution to the WatcherTrigger (an Airflow BaseTrigger):

  1. The consumer calls self.defer(trigger=WatcherTrigger(...))
  2. The WatcherTrigger runs an async polling loop in the Airflow triggerer process
  3. It uses sync_to_async to bridge synchronous XCom queries into the async context
  4. When the model's status appears in XCom, or the producer fails, the trigger yields a TriggerEvent
  5. The consumer resumes and resolves to success or failure

Retry Fallback

On retries, the consumer falls back to running the failed model locally via its inherited DbtRunLocalOperator capability. This means that if a model fails during the centralized dbt build, retrying the consumer task will run that specific model individually rather than re-triggering the entire build.

Related Pages

Implemented By

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment