Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos DbtRunAirflowAsyncFactoryOperator

From Leeroopedia
Revision as of 14:29, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Astronomer_Astronomer_cosmos_DbtRunAirflowAsyncFactoryOperator.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Async_Execution, Factory
Last Updated 2026-02-07 17:00 GMT

Overview

The DbtRunAirflowAsyncFactoryOperator is a factory operator that dynamically creates and morphs into the appropriate async operator class based on the dbt profile type at runtime.

Description

This module contains two components:

_create_async_operator_class is a factory function that dynamically constructs an asynchronous operator class path based on the given profile_type and dbt_class name. It builds a fully qualified module path following the pattern cosmos.operators._asynchronous.{profile_type}.{DbtClass}AirflowAsync{ProfileType}Operator and uses load_method_from_module to import and return the class. If the class cannot be found, it raises an ImportError.

DbtRunAirflowAsyncFactoryOperator extends DbtRunLocalOperator and acts as a factory that resolves the correct async operator at instantiation time. During __init__, it:

  1. Stores the project_dir and profile_config.
  2. Calls create_async_operator which extracts the profile type (e.g., "bigquery") from the profile config and invokes _create_async_operator_class.
  3. Dynamically replaces its own base classes (DbtRunAirflowAsyncFactoryOperator.__bases__) with the resolved async operator class, enabling it to inherit the correct async execution behavior.
  4. Calls super().__init__ which now delegates to the dynamically resolved async operator's initializer.

This pattern avoids Airflow's DuplicateTaskIdFound error that would occur if composition were used instead of dynamic inheritance.

Usage

Use this operator when you need Cosmos to automatically select the correct async execution operator based on the dbt profile type. This is primarily used internally by Cosmos when ExecutionMode.AIRFLOW_ASYNC is selected. The DbtRunAirflowAsyncOperator in cosmos.operators.airflow_async delegates to this factory.

Code Reference

Source Location

Signature

def _create_async_operator_class(profile_type: str, dbt_class: str) -> Any:
    ...

class DbtRunAirflowAsyncFactoryOperator(DbtRunLocalOperator):

    def __init__(
        self,
        project_dir: str,
        profile_config: ProfileConfig,
        extra_context: dict[str, object] | None = None,
        dbt_kwargs: dict[str, object] | None = None,
        **kwargs: Any,
    ) -> None:
        ...

    def create_async_operator(self) -> Any:
        ...

Import

from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator
from cosmos.operators._asynchronous.base import _create_async_operator_class

I/O Contract

Inputs

_create_async_operator_class

Name Type Required Description
profile_type str Yes The dbt profile type (e.g., "bigquery") used to locate the async operator module
dbt_class str Yes The dbt class name prefix (e.g., "DbtRun") used to construct the operator class name

DbtRunAirflowAsyncFactoryOperator.__init__

Name Type Required Description
project_dir str Yes Path to the dbt project directory
profile_config ProfileConfig Yes Cosmos profile configuration containing the dbt profile type and connection details
extra_context None No Additional context passed to the async operator (default: None)
dbt_kwargs None No Additional dbt-specific keyword arguments (default: None)
**kwargs Any No Additional keyword arguments forwarded to the resolved async operator

Outputs

Name Type Description
_create_async_operator_class return class The dynamically imported async operator class (e.g., DbtRunAirflowAsyncBigqueryOperator)
DbtRunAirflowAsyncFactoryOperator instance operator instance An operator instance that has dynamically morphed into the appropriate async operator via base class replacement

Usage Examples

from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator
from cosmos.config import ProfileConfig

# The factory operator automatically resolves the correct async backend
# based on the profile type (e.g., BigQuery)
operator = DbtRunAirflowAsyncFactoryOperator(
    project_dir="/path/to/dbt/project",
    profile_config=ProfileConfig(
        profile_name="my_bigquery_profile",
        target_name="dev",
        profile_mapping=my_bigquery_mapping,
    ),
    task_id="dbt_run_async",
)
# At this point, the operator has morphed into DbtRunAirflowAsyncBigqueryOperator

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment