Implementation:Astronomer Astronomer cosmos Watcher Kubernetes Operators
| Knowledge Sources | |
|---|---|
| Domains | Kubernetes, Watcher_Execution |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The Watcher_Kubernetes_Operators module implements the Kubernetes watcher execution mode, using a producer-consumer sensor pattern where a single producer task runs dbt build in a Kubernetes pod and consumer sensors watch for individual resource completion.
Description
This module defines the full set of operators and supporting classes for the ExecutionMode.WATCHER on Kubernetes:
WatcherKubernetesCallback extends KubernetesPodOperatorCallback and implements a progress_callback static method. This callback is invoked for each line of pod container log output. It delegates to store_dbt_resource_status_from_log to parse JSON-formatted dbt log lines and record individual resource completion statuses. A module-level global variable producer_task_context is used to make the Airflow task context available to the callback, since the callback is registered at operator initialization time but the context only becomes available during execution.
DbtProducerWatcherKubernetesOperator extends DbtBuildKubernetesOperator and serves as the single producer task that executes dbt build inside a Kubernetes pod. Key behaviors:
- Disables Airflow retries (sets
retries=0) to prevent duplicate dbt builds. - Appends the
WatcherKubernetesCallbackto the operator's callback list. - Adds
--log-format jsonto the dbt command flags to enable structured log parsing. - Uses a
CosmosKubernetesPodManager(via@cached_property) instead of the default pod manager. - Guards against retry attempts by checking
try_number > 1in theexecutemethod and skipping execution if detected.
DbtConsumerWatcherKubernetesSensor combines BaseConsumerSensor with DbtRunKubernetesOperator and serves as the base class for all consumer sensors. It implements use_event returning False.
DbtBuildWatcherKubernetesOperator raises NotImplementedError because the build command is executed by the producer task, and exposing a separate build operator is not supported in watcher mode.
The consumer operators that watch for specific dbt resource types:
- DbtSeedWatcherKubernetesOperator -- watches for dbt seed execution progress (extends
DbtSeedMixinandDbtConsumerWatcherKubernetesSensor) - DbtSnapshotWatcherKubernetesOperator -- watches for dbt snapshot execution progress (extends
DbtSnapshotMixinandDbtConsumerWatcherKubernetesSensor) - DbtSourceWatcherKubernetesOperator -- executes dbt source freshness synchronously (extends
DbtSourceKubernetesOperatordirectly) - DbtRunWatcherKubernetesOperator -- watches for dbt model execution progress (extends
DbtConsumerWatcherKubernetesSensorwithDbtRunMixintemplate fields) - DbtTestWatcherKubernetesOperator -- currently a no-op extending
EmptyOperator; implementation is planned for a future release (tracked in GitHub issue #1974)
Usage
Use these operators when configuring Cosmos with ExecutionMode.WATCHER for Kubernetes deployments. The producer operator runs the full dbt build command once, while individual consumer sensors monitor the log output to detect when their specific dbt resource has completed. This is typically configured automatically by the Cosmos DAG/task group converter.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/operators/watcher_kubernetes.py
Signature
class WatcherKubernetesCallback(KubernetesPodOperatorCallback):
@staticmethod
def progress_callback(
*, line: str, client: client_type, mode: str,
container_name: str, timestamp: DateTime | None,
pod: k8s.V1Pod, **kwargs: Any,
) -> None:
...
class DbtProducerWatcherKubernetesOperator(DbtBuildKubernetesOperator):
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
def execute(self, context: Context, **kwargs: Any) -> Any: ...
class DbtConsumerWatcherKubernetesSensor(BaseConsumerSensor, DbtRunKubernetesOperator):
def use_event(self) -> bool: ...
class DbtBuildWatcherKubernetesOperator:
def __init__(self, *args: Any, **kwargs: Any): ... # raises NotImplementedError
class DbtSeedWatcherKubernetesOperator(DbtSeedMixin, DbtConsumerWatcherKubernetesSensor): ...
class DbtSnapshotWatcherKubernetesOperator(DbtSnapshotMixin, DbtConsumerWatcherKubernetesSensor): ...
class DbtSourceWatcherKubernetesOperator(DbtSourceKubernetesOperator): ...
class DbtRunWatcherKubernetesOperator(DbtConsumerWatcherKubernetesSensor): ...
class DbtTestWatcherKubernetesOperator(EmptyOperator): ...
Import
from cosmos.operators.watcher_kubernetes import DbtProducerWatcherKubernetesOperator
from cosmos.operators.watcher_kubernetes import DbtConsumerWatcherKubernetesSensor
from cosmos.operators.watcher_kubernetes import DbtRunWatcherKubernetesOperator
from cosmos.operators.watcher_kubernetes import DbtSeedWatcherKubernetesOperator
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback
I/O Contract
Inputs
DbtProducerWatcherKubernetesOperator.__init__
| Name | Type | Required | Description |
|---|---|---|---|
| *args | Any |
No | Positional arguments forwarded to DbtBuildKubernetesOperator
|
| task_id | str |
No | Task identifier (default: "dbt_producer_watcher_operator")
|
| callbacks | None | No | List of Kubernetes pod callbacks; WatcherKubernetesCallback is automatically appended
|
| **kwargs | Any |
No | Additional keyword arguments forwarded to DbtBuildKubernetesOperator
|
WatcherKubernetesCallback.progress_callback
| Name | Type | Required | Description |
|---|---|---|---|
| line | str |
Yes | A single line of log output from the Kubernetes pod container |
| client | client_type |
Yes | The Kubernetes client instance |
| mode | str |
Yes | Current execution mode ("sync" or "async")
|
| container_name | str |
Yes | Name of the container producing the log line |
| timestamp | None | Yes | Timestamp of the log line |
| pod | k8s.V1Pod |
Yes | The Kubernetes pod object |
Outputs
| Name | Type | Description |
|---|---|---|
| DbtProducerWatcherKubernetesOperator.execute | None | Returns the result from DbtBuildKubernetesOperator.execute, or None if retry detected
|
| DbtConsumerWatcherKubernetesSensor.use_event | bool |
Always returns False |
| DbtBuildWatcherKubernetesOperator.__init__ | -- | Always raises NotImplementedError
|
Usage Examples
from cosmos.operators.watcher_kubernetes import (
DbtProducerWatcherKubernetesOperator,
DbtRunWatcherKubernetesOperator,
DbtSeedWatcherKubernetesOperator,
)
# Producer task: runs the full dbt build in a Kubernetes pod
producer = DbtProducerWatcherKubernetesOperator(
task_id="dbt_build_producer",
project_dir="/path/to/dbt/project",
profile_config=my_profile_config,
image="my-dbt-image:latest",
namespace="dbt",
)
# Consumer sensors: watch for individual resource completion
run_sensor = DbtRunWatcherKubernetesOperator(
task_id="run_my_model",
project_dir="/path/to/dbt/project",
profile_config=my_profile_config,
)
seed_sensor = DbtSeedWatcherKubernetesOperator(
task_id="seed_my_data",
project_dir="/path/to/dbt/project",
profile_config=my_profile_config,
)