Implementation:Astronomer Astronomer cosmos Graph Entities
| Knowledge Sources | |
|---|---|
| Domains | Data_Models, Graph |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The graph entities module defines the foundational data classes -- CosmosEntity, Group, and Task -- that represent nodes in the Cosmos DAG graph before they are rendered into Airflow operators.
Description
This 70-line module establishes the core data model that underpins the entire Cosmos rendering pipeline. All three classes use Python's @dataclass decorator for concise, declarative definitions.
CosmosEntity is the abstract base. It carries an id string that uniquely identifies the node and an upstream_entity_ids list that encodes dependency edges. The add_upstream method appends a dependency, building the directed acyclic graph incrementally.
Group extends CosmosEntity with an entities list, allowing it to contain other CosmosEntity instances (both Task and nested Group objects). The add_entity method registers a child entity within the group, mirroring Airflow's TaskGroup concept at the Cosmos abstraction level.
Task extends CosmosEntity with execution-specific attributes: owner identifies the task owner, operator_class specifies which Airflow operator to instantiate during rendering, arguments holds keyword arguments forwarded to the operator constructor, and extra_context carries additional metadata that rendering logic may consume.
Together these three classes form a lightweight, framework-agnostic graph representation that Cosmos translates into Airflow DAGs via functions such as get_airflow_task.
Usage
Use these entities when building or manipulating Cosmos DAG graphs programmatically. They are the inputs to the rendering layer and are created by dbt project parsers, custom graph builders, or test fixtures. Any code that needs to inspect or transform the Cosmos graph before it becomes an Airflow DAG should work with these data classes.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/core/graph/entities.py
- Lines: Full module (70 lines)
Signature
@dataclass
class CosmosEntity:
id: str
upstream_entity_ids: list[str] = field(default_factory=list)
def add_upstream(self, entity: CosmosEntity) -> None: ...
@dataclass
class Group(CosmosEntity):
entities: list[CosmosEntity] = field(default_factory=list)
def add_entity(self, entity: CosmosEntity) -> None: ...
@dataclass
class Task(CosmosEntity):
owner: str = ""
operator_class: str = ""
arguments: dict = field(default_factory=dict)
extra_context: dict = field(default_factory=dict)
Import
from cosmos.core.graph.entities import CosmosEntity, Group, Task
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| id | str | Yes | Unique identifier for the entity within the graph |
| upstream_entity_ids | list[str] | No | List of entity IDs that this entity depends on (defaults to empty list) |
| entities | list[CosmosEntity] | No | (Group only) Child entities contained in this group |
| owner | str | No | (Task only) Owner identifier for the task |
| operator_class | str | No | (Task only) Fully qualified class path of the Airflow operator to instantiate |
| arguments | dict | No | (Task only) Keyword arguments forwarded to the operator constructor |
| extra_context | dict | No | (Task only) Additional metadata consumed by the rendering layer |
Outputs
| Name | Type | Description |
|---|---|---|
| CosmosEntity instance | CosmosEntity | Base entity with id and upstream dependency tracking |
| Group instance | Group | A container entity holding child entities for hierarchical grouping |
| Task instance | Task | A leaf entity representing a single executable task with operator metadata |
Usage Examples
from cosmos.core.graph.entities import Task, Group
# Create individual tasks
task_a = Task(
id="stg_orders",
operator_class="cosmos.operators.local.DbtRunLocalOperator",
arguments={"models": "stg_orders"},
)
task_b = Task(
id="fct_orders",
operator_class="cosmos.operators.local.DbtRunLocalOperator",
arguments={"models": "fct_orders"},
)
# Establish dependency
task_b.add_upstream(task_a)
# Organise into a group
group = Group(id="order_models")
group.add_entity(task_a)
group.add_entity(task_b)
from cosmos.core.graph.entities import CosmosEntity
# Inspect upstream dependencies
entity = CosmosEntity(id="node_1")
entity.add_upstream(CosmosEntity(id="node_0"))
print(entity.upstream_entity_ids) # ["node_0"]
Related Pages
- Environment:Astronomer_Astronomer_cosmos_Python_Airflow_Runtime
- Astronomer_Astronomer_cosmos_Get_Airflow_Task -- consumes Task entities to produce Airflow operators