Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos DbtKubernetesOperator Execute

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Execution, Kubernetes, Containerization
Last Updated 2026-02-07 00:00 GMT

Overview

Concrete tool for executing dbt commands in Kubernetes pods provided by the astronomer-cosmos library.

Description

The Kubernetes operator family in Cosmos extends both AbstractDbtBase (for dbt command construction) and Airflow's KubernetesPodOperator (for pod lifecycle management). The base class DbtKubernetesBaseOperator handles converting environment variables to Kubernetes V1EnvVar objects, building the dbt command, and managing the pod spec. Concrete operators (DbtRunKubernetesOperator, DbtTestKubernetesOperator, DbtSeedKubernetesOperator, etc.) combine the base with dbt command mixins. A specialized DbtTestWarningHandler callback scrapes pod logs for dbt test warnings via regex patterns.

Usage

Use these operators when running dbt in Kubernetes execution mode. They are typically auto-generated by DbtDag or DbtTaskGroup when ExecutionMode.KUBERNETES is specified, but can also be used directly for standalone tasks like seed loading.

Code Reference

Source Location

  • Repository: astronomer-cosmos
  • File: cosmos/operators/kubernetes.py
  • Lines: L57-181 (DbtKubernetesBaseOperator), L374-380 (DbtTestKubernetesOperator), L392-400 (DbtRunKubernetesOperator)

Signature

class DbtKubernetesBaseOperator(AbstractDbtBase, KubernetesPodOperator):
    """
    Executes a dbt core command in a Kubernetes pod.
    """
    template_fields: Sequence[str] = (
        KubernetesPodOperator.template_fields + AbstractDbtBase.template_fields
    )

    def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None:
        self.profile_config = profile_config
        super().__init__(**kwargs)

    def build_env_args(self, env: dict[str, str]) -> list[V1EnvVar]:
        # Converts dict env vars to Kubernetes V1EnvVar objects
        ...

class DbtSeedKubernetesOperator(DbtSeedMixin, DbtKubernetesBaseOperator):
    """
    Executes a dbt core seed command in a Kubernetes pod.
    """
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.base_cmd = ["seed"]

class DbtRunKubernetesOperator(DbtRunMixin, DbtKubernetesBaseOperator):
    """
    Executes a dbt core run command in a Kubernetes pod.
    """
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.base_cmd = ["run"]

class DbtTestKubernetesOperator(DbtTestMixin, DbtWarningKubernetesOperator):
    """
    Executes a dbt core test command in a Kubernetes pod.
    """
    def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
        self.on_warning_callback = on_warning_callback
        super().__init__(**kwargs)
        self.base_cmd = ["test"]

Key kwargs:

Parameter Type Description
project_dir str / Path Path to the dbt project inside the container
profile_config ProfileConfig / None Optional profile config (may be baked into the image)
image str Docker image containing dbt and the project (e.g., dbt-jaffle-shop:1.0.0)
secrets list[Secret] Kubernetes Secret objects for credential injection
get_logs bool Whether to stream pod logs to the Airflow task log
is_delete_operator_pod bool Whether to delete the pod after execution
namespace str Kubernetes namespace for the pod
env_vars dict / list[V1EnvVar] Environment variables for the pod (auto-converted)
select str dbt node selector
exclude str dbt node exclusion selector

Import

from cosmos.operators.kubernetes import (
    DbtSeedKubernetesOperator,
    DbtRunKubernetesOperator,
    DbtTestKubernetesOperator,
)

I/O Contract

Inputs

Name Type Required Description
image str Yes Docker image with dbt installed and project files baked in
project_dir str Yes Path to the dbt project inside the container
secrets list[Secret] No Kubernetes Secrets for database credentials
select str No dbt node selector (set during DAG rendering)
namespace str No Target Kubernetes namespace (defaults to cluster default)

Outputs

Name Type Description
Pod execution Kubernetes Pod dbt command executes inside an ephemeral pod
Log stream Airflow task log Pod stdout/stderr streamed when get_logs=True
Warning callback Callable invocation DbtTestKubernetesOperator invokes on_warning_callback if test warnings detected via log scraping

Usage Examples

DbtSeedKubernetesOperator with Secrets

from cosmos.operators.kubernetes import DbtSeedKubernetesOperator
from kubernetes.client import models as k8s

# Define Kubernetes secrets for database credentials
postgres_password_secret = k8s.V1EnvVar(
    name="POSTGRES_PASSWORD",
    value_from=k8s.V1EnvVarSource(
        secret_key_ref=k8s.V1SecretKeySelector(
            name="postgres-secrets",
            key="password",
        )
    ),
)

seed_task = DbtSeedKubernetesOperator(
    task_id="seed_jaffle_shop",
    image="dbt-jaffle-shop:1.0.0",
    project_dir="/usr/app/dbt/jaffle_shop",
    get_logs=True,
    is_delete_operator_pod=True,
    env_vars=[postgres_password_secret],
    namespace="data-pipelines",
)

Full Kubernetes DAG with DbtTaskGroup

from airflow.decorators import dag
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.constants import ExecutionMode
from kubernetes.client import models as k8s
import pendulum

@dag(
    start_date=pendulum.datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
)
def jaffle_shop_k8s():
    # Seed task runs separately
    load_seeds = DbtSeedKubernetesOperator(
        task_id="load_seeds",
        image="dbt-jaffle-shop:1.0.0",
        project_dir="/usr/app/dbt/jaffle_shop",
        get_logs=True,
        is_delete_operator_pod=True,
    )

    # DbtTaskGroup auto-generates DbtRunKubernetesOperator
    # and DbtTestKubernetesOperator for each model/test
    transform = DbtTaskGroup(
        project_config=ProjectConfig("/usr/local/airflow/dags/dbt/jaffle_shop"),
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.KUBERNETES,
        ),
        operator_args={
            "image": "dbt-jaffle-shop:1.0.0",
            "get_logs": True,
            "is_delete_operator_pod": True,
            "namespace": "data-pipelines",
        },
    )

    load_seeds >> transform

jaffle_shop_k8s()

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment