Implementation:Astronomer Astronomer cosmos DbtRunAirflowAsyncBigqueryOperator
| Knowledge Sources | |
|---|---|
| Domains | BigQuery, Async_Execution |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The DbtRunAirflowAsyncBigqueryOperator is an async BigQuery operator that submits dbt-compiled SQL as deferrable BigQuery insert jobs, leveraging Airflow's native BigQuery provider for execution.
Description
This operator implements the AIRFLOW_ASYNC execution mode for BigQuery by combining BigQueryInsertJobOperator (from the Google Cloud Airflow provider) with AbstractDbtLocalBase (from Cosmos). The dual inheritance allows it to compile dbt models locally while executing the resulting SQL as native, deferrable BigQuery jobs.
The class uses a sophisticated initialization pattern to handle Airflow 2/3 compatibility: during __init__, it temporarily sets its base classes to only BigQueryInsertJobOperator to avoid conflicts with AbstractDbtLocalBase parameter filtering in Airflow 3, then restores the full inheritance chain afterward.
Key methods:
- execute orchestrates the execution flow. When
enable_setup_async_taskis active, it retrieves the compiled SQL either from XCom (get_sql_from_xcom) or remote storage (get_remote_sql), configures the BigQuery query, and delegates toBigQueryInsertJobOperator.execute. Otherwise, it falls back to the localbuild_and_run_cmdmethod with async context. After execution, it stores template fields and registers dataset events. - get_sql_from_xcom pulls base64-encoded, zlib-compressed SQL from XCom using the producer task ID and the sanitized model file path as the key. It decompresses and returns the SQL string.
- get_remote_sql retrieves the compiled SQL from remote object storage (e.g., GCS, S3) using Airflow's
ObjectStoragePathabstraction, constructing the path from the remote target path setting, DAG/task group identifier, run ID, and relative file path. - execute_complete acts as the callback when the deferrable trigger fires, processing the BigQuery job result, storing template fields, and registering dataset events.
- _register_event creates an
Assetwith a BigQuery URI derived from the dbt node's unique ID and registers it as an output dataset, supporting Airflow's data-aware scheduling. The URI format differs between Airflow 2 (dot-separated) and Airflow 3 (slash-separated).
The operator defines template_fields including gcp_project, dataset, location, compiled_sql, and full_refresh, enabling Jinja rendering of these values.
Usage
This operator is typically not instantiated directly. Instead, it is resolved at runtime by the DbtRunAirflowAsyncFactoryOperator when the dbt profile type is "bigquery". It requires the apache-airflow-providers-google package to be installed.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/operators/_asynchronous/bigquery.py
Signature
class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator, AbstractDbtLocalBase):
template_fields: Sequence[str] = (
"gcp_project", "dataset", "location", "compiled_sql", "full_refresh"
)
def __init__(
self,
project_dir: str,
profile_config: ProfileConfig,
extra_context: dict[str, Any] | None = None,
dbt_kwargs: dict[str, Any] | None = None,
**kwargs: Any,
):
...
def execute(self, context: Context, **kwargs: Any) -> None:
...
def get_sql_from_xcom(self, context: Context) -> str:
...
def get_remote_sql(self) -> str:
...
def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
...
def _register_event(self, context: Context) -> None:
...
Import
from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| project_dir | str |
Yes | Path to the dbt project directory |
| profile_config | ProfileConfig |
Yes | Cosmos profile configuration; must include a profile_mapping with a conn_id for GCP
|
| extra_context | None | No | Additional context including dbt_node_config, dbt_dag_task_group_identifier, and run_id
|
| dbt_kwargs | None | No | dbt-specific keyword arguments; must include task_id, may include full_refresh
|
| **kwargs | Any |
No | Additional keyword arguments including emit_datasets, dag, task_group
|
Outputs
| Name | Type | Description |
|---|---|---|
| execute return | None |
The execute method returns None; SQL is submitted as a BigQuery job |
| execute_complete return | Any |
The BigQuery job ID returned after the deferred trigger completes |
| Side effects | -- | Registers dataset events (BigQuery Asset URIs) for data-aware scheduling |
Usage Examples
# This operator is typically resolved automatically by the factory.
# Direct usage is shown for reference:
from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator
from cosmos.config import ProfileConfig
operator = DbtRunAirflowAsyncBigqueryOperator(
project_dir="/path/to/dbt/project",
profile_config=ProfileConfig(
profile_name="bigquery_profile",
target_name="dev",
profile_mapping=my_bigquery_profile_mapping,
),
extra_context={
"dbt_node_config": {
"file_path": "/path/to/dbt/project/models/my_model.sql",
"unique_id": "model.my_project.my_model",
},
"dbt_dag_task_group_identifier": "my_dag__my_group",
},
dbt_kwargs={
"task_id": "run_my_model",
"full_refresh": False,
},
)