Implementation:Astronomer Astronomer cosmos Aws Ecs Operators
| Knowledge Sources | |
|---|---|
| Domains | AWS, ECS, Operators |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
Provides a suite of Airflow operators for executing dbt commands on AWS Elastic Container Service (ECS) by combining the Cosmos abstract dbt interface with Airflow's native EcsRunTaskOperator.
Description
The Aws_Ecs_Operators module defines a base class, DbtAwsEcsBaseOperator, that inherits from both AbstractDbtBase and EcsRunTaskOperator. This dual inheritance allows each operator to construct the appropriate dbt CLI command via Cosmos's abstract layer and then execute it as a containerised task within an ECS cluster using Airflow's built-in ECS integration. The base operator handles translating dbt profile configuration, environment variables, and the assembled dbt command into an ECS task definition override, ensuring that the correct container receives the command and environment at runtime.
Eight concrete operator classes extend the base, each mapping to a specific dbt sub-command: build, ls, seed, snapshot, source freshness, run, test, and run-operation. Every concrete operator simply sets its ui_color and ui_fgcolor for Airflow's graph view and delegates all execution logic to the base class.
Usage
Use the AWS ECS operators when your dbt project needs to run inside a Docker container managed by ECS Fargate or EC2. This is appropriate when you want Airflow to orchestrate dbt transformations without installing dbt on the Airflow workers themselves, instead offloading execution to a dedicated ECS task. Typical scenarios include production environments where isolation, resource control, and container-level logging (via CloudWatch) are required.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/operators/aws_ecs.py
Signature
class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator):
"""
Base class for running dbt commands as AWS ECS tasks.
"""
def __init__(
self,
cluster: str,
task_definition: str,
container_name: str,
aws_conn_id: str = "aws_default",
profile_config: ProfileConfig | None = None,
command: list[str] | None = None,
environment_variables: dict[str, str] | None = None,
**kwargs,
) -> None:
...
Concrete operators:
class DbtBuildAwsEcsOperator(DbtAwsEcsBaseOperator): ...
class DbtLSAwsEcsOperator(DbtAwsEcsBaseOperator): ...
class DbtSeedAwsEcsOperator(DbtAwsEcsBaseOperator): ...
class DbtSnapshotAwsEcsOperator(DbtAwsEcsBaseOperator): ...
class DbtSourceAwsEcsOperator(DbtAwsEcsBaseOperator): ...
class DbtRunAwsEcsOperator(DbtAwsEcsBaseOperator): ...
class DbtTestAwsEcsOperator(DbtAwsEcsBaseOperator): ...
class DbtRunOperationAwsEcsOperator(DbtAwsEcsBaseOperator): ...
Import
from cosmos.operators.aws_ecs import DbtRunAwsEcsOperator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| cluster | str | Yes | The short name or full ARN of the ECS cluster on which to run the task. |
| task_definition | str | Yes | The family and revision or full ARN of the ECS task definition to use. |
| container_name | str | Yes | The name of the container within the task definition that will receive the dbt command override. |
| aws_conn_id | str | No | Airflow connection ID for AWS credentials. Defaults to "aws_default".
|
| profile_config | ProfileConfig or None | No | Cosmos ProfileConfig object defining the dbt profile and target to use. |
| command | list[str] or None | No | Additional command fragments to pass to the dbt CLI invocation. |
| environment_variables | dict[str, str] or None | No | Extra environment variables injected into the ECS container at runtime. |
Outputs
| Name | Type | Description |
|---|---|---|
| task_arn | str | The ARN of the ECS task that was launched, available via XCom after execution. |
| log_output | str | Standard output and error from the dbt command, captured through CloudWatch Logs if configured. |
Usage Examples
from cosmos.operators.aws_ecs import DbtRunAwsEcsOperator
run_dbt = DbtRunAwsEcsOperator(
task_id="dbt_run_on_ecs",
cluster="my-ecs-cluster",
task_definition="dbt-runner:3",
container_name="dbt",
aws_conn_id="aws_default",
environment_variables={
"DBT_TARGET": "prod",
},
)
from cosmos.operators.aws_ecs import DbtTestAwsEcsOperator
test_dbt = DbtTestAwsEcsOperator(
task_id="dbt_test_on_ecs",
cluster="analytics-cluster",
task_definition="dbt-runner:3",
container_name="dbt",
aws_conn_id="aws_default",
)