Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Astronomer Astronomer cosmos Operator Args Pattern

From Leeroopedia


Overview

Configuration pattern for broadcasting runtime arguments to all dbt operators in a Cosmos-rendered DAG. This is a Pattern Doc describing how the operator_args dictionary flows from user configuration into every generated Airflow operator.

Type

Pattern -- this document describes a configuration convention rather than a single class API.

Source Location

  • Repository: astronomer-cosmos
  • Consumed in: cosmos/converter.py Lines L265-374 (DbtToAirflowConverter.__init__ and build_airflow_graph() call site)
  • Accepted in: cosmos/operators/base.py Lines L103-133 (base operator constructor where kwargs are applied)

Interface Specification

operator_args is a dict[str, Any] passed to DbtDag or DbtTaskGroup. The dictionary is forwarded as keyword arguments to every dbt operator constructor during graph building.

Common Keys

Key Type Description
install_deps bool If True, run dbt deps before each operator invocation.
full_refresh bool If True, pass --full-refresh to dbt commands.
append_env bool If True, append env to the system environment instead of replacing it.
vars dict dbt variables passed via --vars.
env dict Environment variables for the operator process.
dbt_executable_path str Path to the dbt binary.

Kubernetes Mode Keys

Key Type Description
image str Docker image for the KubernetesPodOperator.
secrets list[Secret] Kubernetes secrets to mount.
get_logs bool Whether to retrieve container logs.
is_delete_operator_pod bool Whether to delete the pod after execution.

Watcher Mode Keys

Key Type Description
deferrable bool Enable deferred execution (Airflow 2.6+).
execution_timeout timedelta Maximum execution time for the task.
poke_interval int Seconds between sensor pokes.
timeout int Total sensor timeout in seconds.

Import

N/A -- operator_args is a plain Python dict. No special import is required.

I/O Contract

  • Input: A dict[str, Any] supplied by the user to DbtDag or DbtTaskGroup via the operator_args parameter.
  • Processing: During DbtToAirflowConverter.__init__(), the dict is stored and later passed as task_args to build_airflow_graph(). The graph builder unpacks it as **task_args into each operator constructor.
  • Output: Every generated dbt operator is instantiated with the keys from operator_args as keyword arguments.

Usage Examples

Local Execution Mode

from cosmos import DbtDag, ProjectConfig, ProfileConfig
from pathlib import Path

jaffle_shop = DbtDag(
    dag_id="jaffle_shop_local",
    project_config=ProjectConfig(
        dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
    ),
    profile_config=ProfileConfig(
        profile_name="jaffle_shop",
        target_name="dev",
    ),
    operator_args={
        "install_deps": True,
        "full_refresh": False,
        "vars": {"start_date": "2024-01-01"},
        "append_env": True,
    },
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

Kubernetes Execution Mode

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, ExecutionMode
from kubernetes.client import models as k8s
from pathlib import Path

k8s_secret = k8s.V1EnvFromSource(
    secret_ref=k8s.V1SecretEnvSource(name="dbt-secrets")
)

jaffle_shop_k8s = DbtDag(
    dag_id="jaffle_shop_k8s",
    project_config=ProjectConfig(
        dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
    ),
    profile_config=ProfileConfig(
        profile_name="jaffle_shop",
        target_name="prod",
    ),
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.KUBERNETES,
    ),
    operator_args={
        "image": "my-registry/dbt-jaffle:latest",
        "secrets": [k8s_secret],
        "get_logs": True,
        "is_delete_operator_pod": True,
        "install_deps": True,
    },
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

Watcher / Deferrable Mode

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, ExecutionMode
from datetime import timedelta
from pathlib import Path

jaffle_shop_watcher = DbtDag(
    dag_id="jaffle_shop_watcher",
    project_config=ProjectConfig(
        dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
    ),
    profile_config=ProfileConfig(
        profile_name="jaffle_shop",
        target_name="dev",
    ),
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.LOCAL,
    ),
    operator_args={
        "install_deps": True,
        "deferrable": True,
        "execution_timeout": timedelta(hours=2),
        "poke_interval": 30,
        "timeout": 7200,
    },
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment