Implementation:Astronomer Astronomer cosmos DbtKubernetesOperator Execute
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Execution, Kubernetes, Containerization |
| Last Updated | 2026-02-07 00:00 GMT |
Overview
Concrete tool for executing dbt commands in Kubernetes pods provided by the astronomer-cosmos library.
Description
The Kubernetes operator family in Cosmos extends both AbstractDbtBase (for dbt command construction) and Airflow's KubernetesPodOperator (for pod lifecycle management). The base class DbtKubernetesBaseOperator handles converting environment variables to Kubernetes V1EnvVar objects, building the dbt command, and managing the pod spec. Concrete operators (DbtRunKubernetesOperator, DbtTestKubernetesOperator, DbtSeedKubernetesOperator, etc.) combine the base with dbt command mixins. A specialized DbtTestWarningHandler callback scrapes pod logs for dbt test warnings via regex patterns.
Usage
Use these operators when running dbt in Kubernetes execution mode. They are typically auto-generated by DbtDag or DbtTaskGroup when ExecutionMode.KUBERNETES is specified, but can also be used directly for standalone tasks like seed loading.
Code Reference
Source Location
- Repository: astronomer-cosmos
- File: cosmos/operators/kubernetes.py
- Lines: L57-181 (DbtKubernetesBaseOperator), L374-380 (DbtTestKubernetesOperator), L392-400 (DbtRunKubernetesOperator)
Signature
class DbtKubernetesBaseOperator(AbstractDbtBase, KubernetesPodOperator):
"""
Executes a dbt core command in a Kubernetes pod.
"""
template_fields: Sequence[str] = (
KubernetesPodOperator.template_fields + AbstractDbtBase.template_fields
)
def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None:
self.profile_config = profile_config
super().__init__(**kwargs)
def build_env_args(self, env: dict[str, str]) -> list[V1EnvVar]:
# Converts dict env vars to Kubernetes V1EnvVar objects
...
class DbtSeedKubernetesOperator(DbtSeedMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core seed command in a Kubernetes pod.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.base_cmd = ["seed"]
class DbtRunKubernetesOperator(DbtRunMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core run command in a Kubernetes pod.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.base_cmd = ["run"]
class DbtTestKubernetesOperator(DbtTestMixin, DbtWarningKubernetesOperator):
"""
Executes a dbt core test command in a Kubernetes pod.
"""
def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
self.on_warning_callback = on_warning_callback
super().__init__(**kwargs)
self.base_cmd = ["test"]
Key kwargs:
| Parameter | Type | Description |
|---|---|---|
| project_dir | str / Path | Path to the dbt project inside the container |
| profile_config | ProfileConfig / None | Optional profile config (may be baked into the image) |
| image | str | Docker image containing dbt and the project (e.g., dbt-jaffle-shop:1.0.0)
|
| secrets | list[Secret] | Kubernetes Secret objects for credential injection |
| get_logs | bool | Whether to stream pod logs to the Airflow task log |
| is_delete_operator_pod | bool | Whether to delete the pod after execution |
| namespace | str | Kubernetes namespace for the pod |
| env_vars | dict / list[V1EnvVar] | Environment variables for the pod (auto-converted) |
| select | str | dbt node selector |
| exclude | str | dbt node exclusion selector |
Import
from cosmos.operators.kubernetes import (
DbtSeedKubernetesOperator,
DbtRunKubernetesOperator,
DbtTestKubernetesOperator,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| image | str | Yes | Docker image with dbt installed and project files baked in |
| project_dir | str | Yes | Path to the dbt project inside the container |
| secrets | list[Secret] | No | Kubernetes Secrets for database credentials |
| select | str | No | dbt node selector (set during DAG rendering) |
| namespace | str | No | Target Kubernetes namespace (defaults to cluster default) |
Outputs
| Name | Type | Description |
|---|---|---|
| Pod execution | Kubernetes Pod | dbt command executes inside an ephemeral pod |
| Log stream | Airflow task log | Pod stdout/stderr streamed when get_logs=True
|
| Warning callback | Callable invocation | DbtTestKubernetesOperator invokes on_warning_callback if test warnings detected via log scraping
|
Usage Examples
DbtSeedKubernetesOperator with Secrets
from cosmos.operators.kubernetes import DbtSeedKubernetesOperator
from kubernetes.client import models as k8s
# Define Kubernetes secrets for database credentials
postgres_password_secret = k8s.V1EnvVar(
name="POSTGRES_PASSWORD",
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name="postgres-secrets",
key="password",
)
),
)
seed_task = DbtSeedKubernetesOperator(
task_id="seed_jaffle_shop",
image="dbt-jaffle-shop:1.0.0",
project_dir="/usr/app/dbt/jaffle_shop",
get_logs=True,
is_delete_operator_pod=True,
env_vars=[postgres_password_secret],
namespace="data-pipelines",
)
Full Kubernetes DAG with DbtTaskGroup
from airflow.decorators import dag
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode
from kubernetes.client import models as k8s
import pendulum
@dag(
start_date=pendulum.datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
)
def jaffle_shop_k8s():
# Seed task runs separately
load_seeds = DbtSeedKubernetesOperator(
task_id="load_seeds",
image="dbt-jaffle-shop:1.0.0",
project_dir="/usr/app/dbt/jaffle_shop",
get_logs=True,
is_delete_operator_pod=True,
)
# DbtTaskGroup auto-generates DbtRunKubernetesOperator
# and DbtTestKubernetesOperator for each model/test
transform = DbtTaskGroup(
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.KUBERNETES,
),
operator_args={
"image": "dbt-jaffle-shop:1.0.0",
"get_logs": True,
"is_delete_operator_pod": True,
"namespace": "data-pipelines",
},
)
load_seeds >> transform
jaffle_shop_k8s()