Implementation:Astronomer Astronomer cosmos Operator Args Pattern
Appearance
Overview
Configuration pattern for broadcasting runtime arguments to all dbt operators in a Cosmos-rendered DAG. This is a Pattern Doc describing how the operator_args dictionary flows from user configuration into every generated Airflow operator.
Type
Pattern -- this document describes a configuration convention rather than a single class API.
Source Location
- Repository: astronomer-cosmos
- Consumed in:
cosmos/converter.pyLines L265-374 (DbtToAirflowConverter.__init__andbuild_airflow_graph()call site) - Accepted in:
cosmos/operators/base.pyLines L103-133 (base operator constructor where kwargs are applied)
Interface Specification
operator_args is a dict[str, Any] passed to DbtDag or DbtTaskGroup. The dictionary is forwarded as keyword arguments to every dbt operator constructor during graph building.
Common Keys
| Key | Type | Description |
|---|---|---|
install_deps |
bool |
If True, run dbt deps before each operator invocation.
|
full_refresh |
bool |
If True, pass --full-refresh to dbt commands.
|
append_env |
bool |
If True, append env to the system environment instead of replacing it.
|
vars |
dict |
dbt variables passed via --vars.
|
env |
dict |
Environment variables for the operator process. |
dbt_executable_path |
str |
Path to the dbt binary. |
Kubernetes Mode Keys
| Key | Type | Description |
|---|---|---|
image |
str |
Docker image for the KubernetesPodOperator. |
secrets |
list[Secret] |
Kubernetes secrets to mount. |
get_logs |
bool |
Whether to retrieve container logs. |
is_delete_operator_pod |
bool |
Whether to delete the pod after execution. |
Watcher Mode Keys
| Key | Type | Description |
|---|---|---|
deferrable |
bool |
Enable deferred execution (Airflow 2.6+). |
execution_timeout |
timedelta |
Maximum execution time for the task. |
poke_interval |
int |
Seconds between sensor pokes. |
timeout |
int |
Total sensor timeout in seconds. |
Import
N/A -- operator_args is a plain Python dict. No special import is required.
I/O Contract
- Input: A
dict[str, Any]supplied by the user toDbtDagorDbtTaskGroupvia theoperator_argsparameter. - Processing: During
DbtToAirflowConverter.__init__(), the dict is stored and later passed astask_argstobuild_airflow_graph(). The graph builder unpacks it as**task_argsinto each operator constructor. - Output: Every generated dbt operator is instantiated with the keys from
operator_argsas keyword arguments.
Usage Examples
Local Execution Mode
from cosmos import DbtDag, ProjectConfig, ProfileConfig
from pathlib import Path
jaffle_shop = DbtDag(
dag_id="jaffle_shop_local",
project_config=ProjectConfig(
dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
),
operator_args={
"install_deps": True,
"full_refresh": False,
"vars": {"start_date": "2024-01-01"},
"append_env": True,
},
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
)
Kubernetes Execution Mode
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, ExecutionMode
from kubernetes.client import models as k8s
from pathlib import Path
k8s_secret = k8s.V1EnvFromSource(
secret_ref=k8s.V1SecretEnvSource(name="dbt-secrets")
)
jaffle_shop_k8s = DbtDag(
dag_id="jaffle_shop_k8s",
project_config=ProjectConfig(
dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="prod",
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.KUBERNETES,
),
operator_args={
"image": "my-registry/dbt-jaffle:latest",
"secrets": [k8s_secret],
"get_logs": True,
"is_delete_operator_pod": True,
"install_deps": True,
},
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
)
Watcher / Deferrable Mode
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, ExecutionMode
from datetime import timedelta
from pathlib import Path
jaffle_shop_watcher = DbtDag(
dag_id="jaffle_shop_watcher",
project_config=ProjectConfig(
dbt_project_path=Path("/usr/local/airflow/dags/dbt/jaffle_shop"),
),
profile_config=ProfileConfig(
profile_name="jaffle_shop",
target_name="dev",
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
),
operator_args={
"install_deps": True,
"deferrable": True,
"execution_timeout": timedelta(hours=2),
"poke_interval": 30,
"timeout": 7200,
},
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
)
Related Pages
Implements Principle
Requires Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment