Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos Graph Entities

From Leeroopedia


Knowledge Sources
Domains Data_Models, Graph
Last Updated 2026-02-07 17:00 GMT

Overview

The graph entities module defines the foundational data classes -- CosmosEntity, Group, and Task -- that represent nodes in the Cosmos DAG graph before they are rendered into Airflow operators.

Description

This 70-line module establishes the core data model that underpins the entire Cosmos rendering pipeline. All three classes use Python's @dataclass decorator for concise, declarative definitions.

CosmosEntity is the abstract base. It carries an id string that uniquely identifies the node and an upstream_entity_ids list that encodes dependency edges. The add_upstream method appends a dependency, building the directed acyclic graph incrementally.

Group extends CosmosEntity with an entities list, allowing it to contain other CosmosEntity instances (both Task and nested Group objects). The add_entity method registers a child entity within the group, mirroring Airflow's TaskGroup concept at the Cosmos abstraction level.

Task extends CosmosEntity with execution-specific attributes: owner identifies the task owner, operator_class specifies which Airflow operator to instantiate during rendering, arguments holds keyword arguments forwarded to the operator constructor, and extra_context carries additional metadata that rendering logic may consume.

Together these three classes form a lightweight, framework-agnostic graph representation that Cosmos translates into Airflow DAGs via functions such as get_airflow_task.

Usage

Use these entities when building or manipulating Cosmos DAG graphs programmatically. They are the inputs to the rendering layer and are created by dbt project parsers, custom graph builders, or test fixtures. Any code that needs to inspect or transform the Cosmos graph before it becomes an Airflow DAG should work with these data classes.

Code Reference

Source Location

Signature

@dataclass
class CosmosEntity:
    id: str
    upstream_entity_ids: list[str] = field(default_factory=list)

    def add_upstream(self, entity: CosmosEntity) -> None: ...


@dataclass
class Group(CosmosEntity):
    entities: list[CosmosEntity] = field(default_factory=list)

    def add_entity(self, entity: CosmosEntity) -> None: ...


@dataclass
class Task(CosmosEntity):
    owner: str = ""
    operator_class: str = ""
    arguments: dict = field(default_factory=dict)
    extra_context: dict = field(default_factory=dict)

Import

from cosmos.core.graph.entities import CosmosEntity, Group, Task

I/O Contract

Inputs

Name Type Required Description
id str Yes Unique identifier for the entity within the graph
upstream_entity_ids list[str] No List of entity IDs that this entity depends on (defaults to empty list)
entities list[CosmosEntity] No (Group only) Child entities contained in this group
owner str No (Task only) Owner identifier for the task
operator_class str No (Task only) Fully qualified class path of the Airflow operator to instantiate
arguments dict No (Task only) Keyword arguments forwarded to the operator constructor
extra_context dict No (Task only) Additional metadata consumed by the rendering layer

Outputs

Name Type Description
CosmosEntity instance CosmosEntity Base entity with id and upstream dependency tracking
Group instance Group A container entity holding child entities for hierarchical grouping
Task instance Task A leaf entity representing a single executable task with operator metadata

Usage Examples

from cosmos.core.graph.entities import Task, Group

# Create individual tasks
task_a = Task(
    id="stg_orders",
    operator_class="cosmos.operators.local.DbtRunLocalOperator",
    arguments={"models": "stg_orders"},
)

task_b = Task(
    id="fct_orders",
    operator_class="cosmos.operators.local.DbtRunLocalOperator",
    arguments={"models": "fct_orders"},
)

# Establish dependency
task_b.add_upstream(task_a)

# Organise into a group
group = Group(id="order_models")
group.add_entity(task_a)
group.add_entity(task_b)
from cosmos.core.graph.entities import CosmosEntity

# Inspect upstream dependencies
entity = CosmosEntity(id="node_1")
entity.add_upstream(CosmosEntity(id="node_0"))
print(entity.upstream_entity_ids)  # ["node_0"]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment