Implementation:Astronomer Astronomer cosmos Airflow Async Operators
| Knowledge Sources | |
|---|---|
| Domains | Async_Execution, Operators |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The Airflow_Async_Operators module provides thin wrapper classes for all dbt commands that execute using the AIRFLOW_ASYNC execution mode.
Description
This module defines ten operator classes that map dbt commands to the Airflow async execution mode. Most are simple pass-through subclasses of their corresponding local operator, inheriting all behavior unchanged. The key exception is DbtRunAirflowAsyncOperator, which extends DbtRunAirflowAsyncFactoryOperator and includes logic to separate Cosmos-specific kwargs from standard Airflow kwargs before passing them to the factory.
The operators are:
- DbtBuildAirflowAsyncOperator -- extends
DbtBuildLocalOperator(pass-through) - DbtLSAirflowAsyncOperator -- extends
DbtLSLocalOperator(pass-through) - DbtSeedAirflowAsyncOperator -- extends
DbtSeedLocalOperator(pass-through) - DbtSnapshotAirflowAsyncOperator -- extends
DbtSnapshotLocalOperator(pass-through) - DbtSourceAirflowAsyncOperator -- extends
DbtSourceLocalOperator(pass-through) - DbtRunAirflowAsyncOperator -- extends
DbtRunAirflowAsyncFactoryOperator; performs kwargs filtering to separate dbt-specific arguments from Airflow operator arguments - DbtTestAirflowAsyncOperator -- extends
DbtTestLocalOperator(pass-through) - DbtRunOperationAirflowAsyncOperator -- extends
DbtRunOperationLocalOperator(pass-through) - DbtCompileAirflowAsyncOperator -- extends
DbtCompileLocalOperator(pass-through) - DbtCloneAirflowAsyncOperator -- extends
DbtCloneLocalOperator(pass-through)
The DbtRunAirflowAsyncOperator uses introspection on the AbstractDbtBase, DbtLocalBaseOperator, and AbstractDbtLocalBase initializer signatures to identify which kwargs belong to the dbt layer versus the async operator layer. It pops full_refresh into dbt_kwargs and routes task_id to both layers.
The module defines _SUPPORTED_DATABASES as [BIGQUERY_PROFILE_TYPE], indicating that BigQuery is currently the only supported database for async execution.
Usage
Use these operators when configuring Cosmos with ExecutionMode.AIRFLOW_ASYNC. They are typically instantiated automatically by the Cosmos DAG/task group converter based on the selected execution mode. For the run command specifically, the factory pattern dynamically resolves the correct database-specific async operator at runtime.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/operators/airflow_async.py
Signature
class DbtBuildAirflowAsyncOperator(DbtBuildLocalOperator): pass
class DbtLSAirflowAsyncOperator(DbtLSLocalOperator): pass
class DbtSeedAirflowAsyncOperator(DbtSeedLocalOperator): pass
class DbtSnapshotAirflowAsyncOperator(DbtSnapshotLocalOperator): pass
class DbtSourceAirflowAsyncOperator(DbtSourceLocalOperator): pass
class DbtRunAirflowAsyncOperator(DbtRunAirflowAsyncFactoryOperator):
def __init__(
self,
project_dir: str,
profile_config: ProfileConfig,
extra_context: dict[str, object] | None = None,
**kwargs: Any,
) -> None:
...
class DbtTestAirflowAsyncOperator(DbtTestLocalOperator): pass
class DbtRunOperationAirflowAsyncOperator(DbtRunOperationLocalOperator): pass
class DbtCompileAirflowAsyncOperator(DbtCompileLocalOperator): pass
class DbtCloneAirflowAsyncOperator(DbtCloneLocalOperator): pass
Import
from cosmos.operators.airflow_async import DbtRunAirflowAsyncOperator
from cosmos.operators.airflow_async import DbtBuildAirflowAsyncOperator
from cosmos.operators.airflow_async import DbtTestAirflowAsyncOperator
from cosmos.operators.airflow_async import DbtSeedAirflowAsyncOperator
from cosmos.operators.airflow_async import DbtCompileAirflowAsyncOperator
I/O Contract
Inputs
DbtRunAirflowAsyncOperator.__init__
| Name | Type | Required | Description |
|---|---|---|---|
| project_dir | str |
Yes | Path to the dbt project directory |
| profile_config | ProfileConfig |
Yes | Cosmos profile configuration with profile type and connection details |
| extra_context | None | No | Additional context for the async operator (default: None) |
| **kwargs | Any |
No | Mixed kwargs; automatically separated into dbt-specific (dbt_kwargs) and Airflow operator args (clean_kwargs)
|
Pass-through operators
| Name | Type | Required | Description |
|---|---|---|---|
| (inherited) | -- | -- | All pass-through operators inherit the full parameter set from their respective local operator base class |
Outputs
| Name | Type | Description |
|---|---|---|
| operator instance | Airflow operator | An operator configured for async execution; the run operator dynamically resolves to the correct database-specific async backend |
Usage Examples
from cosmos.operators.airflow_async import (
DbtRunAirflowAsyncOperator,
DbtSeedAirflowAsyncOperator,
DbtTestAirflowAsyncOperator,
)
from cosmos.config import ProfileConfig
profile_config = ProfileConfig(
profile_name="my_bigquery_profile",
target_name="dev",
profile_mapping=my_bigquery_mapping,
)
# The run operator uses the factory pattern to resolve the BigQuery async backend
run_task = DbtRunAirflowAsyncOperator(
task_id="dbt_run",
project_dir="/path/to/dbt/project",
profile_config=profile_config,
full_refresh=False,
)
# Seed and test operators inherit local operator behavior
seed_task = DbtSeedAirflowAsyncOperator(
task_id="dbt_seed",
project_dir="/path/to/dbt/project",
profile_config=profile_config,
)
test_task = DbtTestAirflowAsyncOperator(
task_id="dbt_test",
project_dir="/path/to/dbt/project",
profile_config=profile_config,
)