Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos DbtRunAirflowAsyncBigqueryOperator

From Leeroopedia


Knowledge Sources
Domains BigQuery, Async_Execution
Last Updated 2026-02-07 17:00 GMT

Overview

The DbtRunAirflowAsyncBigqueryOperator is an async BigQuery operator that submits dbt-compiled SQL as deferrable BigQuery insert jobs, leveraging Airflow's native BigQuery provider for execution.

Description

This operator implements the AIRFLOW_ASYNC execution mode for BigQuery by combining BigQueryInsertJobOperator (from the Google Cloud Airflow provider) with AbstractDbtLocalBase (from Cosmos). The dual inheritance allows it to compile dbt models locally while executing the resulting SQL as native, deferrable BigQuery jobs.

The class uses a sophisticated initialization pattern to handle Airflow 2/3 compatibility: during __init__, it temporarily sets its base classes to only BigQueryInsertJobOperator to avoid conflicts with AbstractDbtLocalBase parameter filtering in Airflow 3, then restores the full inheritance chain afterward.

Key methods:

  • execute orchestrates the execution flow. When enable_setup_async_task is active, it retrieves the compiled SQL either from XCom (get_sql_from_xcom) or remote storage (get_remote_sql), configures the BigQuery query, and delegates to BigQueryInsertJobOperator.execute. Otherwise, it falls back to the local build_and_run_cmd method with async context. After execution, it stores template fields and registers dataset events.
  • get_sql_from_xcom pulls base64-encoded, zlib-compressed SQL from XCom using the producer task ID and the sanitized model file path as the key. It decompresses and returns the SQL string.
  • get_remote_sql retrieves the compiled SQL from remote object storage (e.g., GCS, S3) using Airflow's ObjectStoragePath abstraction, constructing the path from the remote target path setting, DAG/task group identifier, run ID, and relative file path.
  • execute_complete acts as the callback when the deferrable trigger fires, processing the BigQuery job result, storing template fields, and registering dataset events.
  • _register_event creates an Asset with a BigQuery URI derived from the dbt node's unique ID and registers it as an output dataset, supporting Airflow's data-aware scheduling. The URI format differs between Airflow 2 (dot-separated) and Airflow 3 (slash-separated).

The operator defines template_fields including gcp_project, dataset, location, compiled_sql, and full_refresh, enabling Jinja rendering of these values.

Usage

This operator is typically not instantiated directly. Instead, it is resolved at runtime by the DbtRunAirflowAsyncFactoryOperator when the dbt profile type is "bigquery". It requires the apache-airflow-providers-google package to be installed.

Code Reference

Source Location

Signature

class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator, AbstractDbtLocalBase):

    template_fields: Sequence[str] = (
        "gcp_project", "dataset", "location", "compiled_sql", "full_refresh"
    )

    def __init__(
        self,
        project_dir: str,
        profile_config: ProfileConfig,
        extra_context: dict[str, Any] | None = None,
        dbt_kwargs: dict[str, Any] | None = None,
        **kwargs: Any,
    ):
        ...

    def execute(self, context: Context, **kwargs: Any) -> None:
        ...

    def get_sql_from_xcom(self, context: Context) -> str:
        ...

    def get_remote_sql(self) -> str:
        ...

    def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
        ...

    def _register_event(self, context: Context) -> None:
        ...

Import

from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator

I/O Contract

Inputs

Name Type Required Description
project_dir str Yes Path to the dbt project directory
profile_config ProfileConfig Yes Cosmos profile configuration; must include a profile_mapping with a conn_id for GCP
extra_context None No Additional context including dbt_node_config, dbt_dag_task_group_identifier, and run_id
dbt_kwargs None No dbt-specific keyword arguments; must include task_id, may include full_refresh
**kwargs Any No Additional keyword arguments including emit_datasets, dag, task_group

Outputs

Name Type Description
execute return None The execute method returns None; SQL is submitted as a BigQuery job
execute_complete return Any The BigQuery job ID returned after the deferred trigger completes
Side effects -- Registers dataset events (BigQuery Asset URIs) for data-aware scheduling

Usage Examples

# This operator is typically resolved automatically by the factory.
# Direct usage is shown for reference:
from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator
from cosmos.config import ProfileConfig

operator = DbtRunAirflowAsyncBigqueryOperator(
    project_dir="/path/to/dbt/project",
    profile_config=ProfileConfig(
        profile_name="bigquery_profile",
        target_name="dev",
        profile_mapping=my_bigquery_profile_mapping,
    ),
    extra_context={
        "dbt_node_config": {
            "file_path": "/path/to/dbt/project/models/my_model.sql",
            "unique_id": "model.my_project.my_model",
        },
        "dbt_dag_task_group_identifier": "my_dag__my_group",
    },
    dbt_kwargs={
        "task_id": "run_my_model",
        "full_refresh": False,
    },
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment