Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos Watcher Kubernetes Operators

From Leeroopedia


Knowledge Sources
Domains Kubernetes, Watcher_Execution
Last Updated 2026-02-07 17:00 GMT

Overview

The Watcher_Kubernetes_Operators module implements the Kubernetes watcher execution mode, using a producer-consumer sensor pattern where a single producer task runs dbt build in a Kubernetes pod and consumer sensors watch for individual resource completion.

Description

This module defines the full set of operators and supporting classes for the ExecutionMode.WATCHER on Kubernetes:

WatcherKubernetesCallback extends KubernetesPodOperatorCallback and implements a progress_callback static method. This callback is invoked for each line of pod container log output. It delegates to store_dbt_resource_status_from_log to parse JSON-formatted dbt log lines and record individual resource completion statuses. A module-level global variable producer_task_context is used to make the Airflow task context available to the callback, since the callback is registered at operator initialization time but the context only becomes available during execution.

DbtProducerWatcherKubernetesOperator extends DbtBuildKubernetesOperator and serves as the single producer task that executes dbt build inside a Kubernetes pod. Key behaviors:

  • Disables Airflow retries (sets retries=0) to prevent duplicate dbt builds.
  • Appends the WatcherKubernetesCallback to the operator's callback list.
  • Adds --log-format json to the dbt command flags to enable structured log parsing.
  • Uses a CosmosKubernetesPodManager (via @cached_property) instead of the default pod manager.
  • Guards against retry attempts by checking try_number > 1 in the execute method and skipping execution if detected.

DbtConsumerWatcherKubernetesSensor combines BaseConsumerSensor with DbtRunKubernetesOperator and serves as the base class for all consumer sensors. It implements use_event returning False.

DbtBuildWatcherKubernetesOperator raises NotImplementedError because the build command is executed by the producer task, and exposing a separate build operator is not supported in watcher mode.

The consumer operators that watch for specific dbt resource types:

  • DbtSeedWatcherKubernetesOperator -- watches for dbt seed execution progress (extends DbtSeedMixin and DbtConsumerWatcherKubernetesSensor)
  • DbtSnapshotWatcherKubernetesOperator -- watches for dbt snapshot execution progress (extends DbtSnapshotMixin and DbtConsumerWatcherKubernetesSensor)
  • DbtSourceWatcherKubernetesOperator -- executes dbt source freshness synchronously (extends DbtSourceKubernetesOperator directly)
  • DbtRunWatcherKubernetesOperator -- watches for dbt model execution progress (extends DbtConsumerWatcherKubernetesSensor with DbtRunMixin template fields)
  • DbtTestWatcherKubernetesOperator -- currently a no-op extending EmptyOperator; implementation is planned for a future release (tracked in GitHub issue #1974)

Usage

Use these operators when configuring Cosmos with ExecutionMode.WATCHER for Kubernetes deployments. The producer operator runs the full dbt build command once, while individual consumer sensors monitor the log output to detect when their specific dbt resource has completed. This is typically configured automatically by the Cosmos DAG/task group converter.

Code Reference

Source Location

Signature

class WatcherKubernetesCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def progress_callback(
        *, line: str, client: client_type, mode: str,
        container_name: str, timestamp: DateTime | None,
        pod: k8s.V1Pod, **kwargs: Any,
    ) -> None:
        ...

class DbtProducerWatcherKubernetesOperator(DbtBuildKubernetesOperator):
    def __init__(self, *args: Any, **kwargs: Any) -> None: ...
    def execute(self, context: Context, **kwargs: Any) -> Any: ...

class DbtConsumerWatcherKubernetesSensor(BaseConsumerSensor, DbtRunKubernetesOperator):
    def use_event(self) -> bool: ...

class DbtBuildWatcherKubernetesOperator:
    def __init__(self, *args: Any, **kwargs: Any): ...  # raises NotImplementedError

class DbtSeedWatcherKubernetesOperator(DbtSeedMixin, DbtConsumerWatcherKubernetesSensor): ...
class DbtSnapshotWatcherKubernetesOperator(DbtSnapshotMixin, DbtConsumerWatcherKubernetesSensor): ...
class DbtSourceWatcherKubernetesOperator(DbtSourceKubernetesOperator): ...
class DbtRunWatcherKubernetesOperator(DbtConsumerWatcherKubernetesSensor): ...
class DbtTestWatcherKubernetesOperator(EmptyOperator): ...

Import

from cosmos.operators.watcher_kubernetes import DbtProducerWatcherKubernetesOperator
from cosmos.operators.watcher_kubernetes import DbtConsumerWatcherKubernetesSensor
from cosmos.operators.watcher_kubernetes import DbtRunWatcherKubernetesOperator
from cosmos.operators.watcher_kubernetes import DbtSeedWatcherKubernetesOperator
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

I/O Contract

Inputs

DbtProducerWatcherKubernetesOperator.__init__

Name Type Required Description
*args Any No Positional arguments forwarded to DbtBuildKubernetesOperator
task_id str No Task identifier (default: "dbt_producer_watcher_operator")
callbacks None No List of Kubernetes pod callbacks; WatcherKubernetesCallback is automatically appended
**kwargs Any No Additional keyword arguments forwarded to DbtBuildKubernetesOperator

WatcherKubernetesCallback.progress_callback

Name Type Required Description
line str Yes A single line of log output from the Kubernetes pod container
client client_type Yes The Kubernetes client instance
mode str Yes Current execution mode ("sync" or "async")
container_name str Yes Name of the container producing the log line
timestamp None Yes Timestamp of the log line
pod k8s.V1Pod Yes The Kubernetes pod object

Outputs

Name Type Description
DbtProducerWatcherKubernetesOperator.execute None Returns the result from DbtBuildKubernetesOperator.execute, or None if retry detected
DbtConsumerWatcherKubernetesSensor.use_event bool Always returns False
DbtBuildWatcherKubernetesOperator.__init__ -- Always raises NotImplementedError

Usage Examples

from cosmos.operators.watcher_kubernetes import (
    DbtProducerWatcherKubernetesOperator,
    DbtRunWatcherKubernetesOperator,
    DbtSeedWatcherKubernetesOperator,
)

# Producer task: runs the full dbt build in a Kubernetes pod
producer = DbtProducerWatcherKubernetesOperator(
    task_id="dbt_build_producer",
    project_dir="/path/to/dbt/project",
    profile_config=my_profile_config,
    image="my-dbt-image:latest",
    namespace="dbt",
)

# Consumer sensors: watch for individual resource completion
run_sensor = DbtRunWatcherKubernetesOperator(
    task_id="run_my_model",
    project_dir="/path/to/dbt/project",
    profile_config=my_profile_config,
)

seed_sensor = DbtSeedWatcherKubernetesOperator(
    task_id="seed_my_data",
    project_dir="/path/to/dbt/project",
    profile_config=my_profile_config,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment