Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow GenericDAGNode

From Leeroopedia


Knowledge Sources
Domains Core_Infrastructure, DAG_Structure
Last Updated 2026-02-08 21:00 GMT

Overview

Generic base class for nodes in a directed acyclic graph (DAG), providing upstream/downstream relationship tracking, iterative graph traversal algorithms, and setup/teardown-aware dependency resolution.

Description

The GenericDAGNode class is a generic, protocol-based base class that represents a node (task or task group) within a DAG. It is parameterized over three type variables -- Dag, Task, and TaskGroup -- each bound to corresponding protocol interfaces (DagProtocol, TaskProtocol, TaskGroupProtocol).

Key design decisions:

  • Iterative traversal: The get_flat_relative_ids() method uses an iterative breadth-first approach rather than recursion to avoid Python stack overflow on deeply nested DAG structures. It supports an optional depth parameter for bounded traversal.
  • Setup/teardown awareness: Methods like get_upstreams_follow_setups(), get_upstreams_only_setups_and_teardowns(), and get_upstreams_only_setups() provide specialized traversal logic that understands Airflow's setup/teardown task relationships.
  • Protocol-based generics: The class uses Protocol types (DagProtocol, TaskProtocol, TaskGroupProtocol) rather than concrete class dependencies, enabling use in both the core Airflow runtime and the Task SDK without circular imports.
  • Structlog integration: The log property lazily creates a structlog logger named after the concrete class.

Core attributes managed per node:

  • upstream_task_ids: set[str] -- IDs of direct upstream tasks
  • downstream_task_ids: set[str] -- IDs of direct downstream tasks
  • dag: Dag | None -- Reference to the parent DAG
  • task_group: TaskGroup | None -- Reference to the containing task group

Note: The set_downstream, set_upstream, and bitshift operators (>>, <<) are implemented in subclasses within airflow-core, not in this shared base class.

Usage

from airflow_shared.dagnode.node import GenericDAGNode

# GenericDAGNode is typically subclassed by concrete task implementations
# in airflow-core and the Task SDK. Direct usage is uncommon.

# Example: traversing upstream dependencies
for task in my_task_node.upstream_list:
    print(f"Upstream: {task.task_id}")

# Get all transitive downstream task IDs (iterative, not recursive)
all_downstream = my_task_node.get_flat_relative_ids(upstream=False)

# Bounded traversal: only 2 levels deep
nearby = my_task_node.get_flat_relative_ids(upstream=True, depth=2)

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/dagnode/src/airflow_shared/dagnode/node.py

Protocols

class DagProtocol(Protocol):
    """Protocol defining the minimum interface required for Dag generic type."""
    dag_id: str
    task_dict: dict[str, Any]
    def get_task(self, tid: str) -> Any: ...

class TaskProtocol(Protocol):
    """Protocol defining the minimum interface required for Task generic type."""
    task_id: str
    is_setup: bool
    is_teardown: bool
    downstream_list: Iterable[Self]
    downstream_task_ids: set[str]

class TaskGroupProtocol(Protocol):
    """Protocol defining the minimum interface required for TaskGroup generic type."""
    node_id: str
    prefix_group_id: bool

Key Class: GenericDAGNode (lines 70-265)

class GenericDAGNode(Generic[Dag, Task, TaskGroup]):
    """Generic class for a node in the graph of a workflow."""

    dag: Dag | None
    task_group: TaskGroup | None
    upstream_task_ids: set[str]
    downstream_task_ids: set[str]

    def __init__(self): ...

    # Properties
    @property
    def log(self) -> Logger: ...
    @property
    def dag_id(self) -> str: ...
    @property
    def node_id(self) -> str: ...   # abstract -- raises NotImplementedError
    @property
    def label(self) -> str | None: ...
    @property
    def upstream_list(self) -> Iterable[Task]: ...
    @property
    def downstream_list(self) -> Iterable[Task]: ...

    # Relationship queries
    def has_dag(self) -> bool: ...
    def get_dag(self) -> Dag | None: ...
    def get_direct_relative_ids(self, upstream: bool = False) -> set[str]: ...
    def get_direct_relatives(self, upstream: bool = False) -> Iterable[Task]: ...

    # Traversal (iterative, not recursive)
    def get_flat_relative_ids(
        self, *, upstream: bool = False, depth: int | None = None,
    ) -> set[str]: ...
    def get_flat_relatives(
        self, upstream: bool = False, depth: int | None = None,
    ) -> Collection[Task]: ...

    # Setup/teardown aware traversal
    def get_upstreams_follow_setups(self, depth: int | None = None) -> Iterable[Task]: ...
    def get_upstreams_only_setups_and_teardowns(self) -> Iterable[Task]: ...
    def get_upstreams_only_setups(self) -> Iterable[Task]: ...

Import

from airflow_shared.dagnode.node import GenericDAGNode
from airflow_shared.dagnode.node import DagProtocol, TaskProtocol, TaskGroupProtocol

I/O Contract

Inputs

Name Type Required Description
upstream bool No Direction flag: True for upstream, False for downstream (default)
depth int or None No Maximum traversal depth; None means unbounded traversal

Outputs

Name Type Description
upstream_list Iterable[Task] Direct upstream task objects (resolved from upstream_task_ids via DAG)
downstream_list Iterable[Task] Direct downstream task objects (resolved from downstream_task_ids via DAG)
flat_relative_ids set[str] All transitive relative task IDs (iterative BFS)
flat_relatives Collection[Task] All transitive relative task objects
setup/teardown iterables Iterable[Task] Filtered task sets respecting setup/teardown relationships

Usage Examples

Iterative Graph Traversal

# Get all transitive upstream task IDs without recursion
all_upstream_ids = task_node.get_flat_relative_ids(upstream=True)
print(f"Task depends on {len(all_upstream_ids)} upstream tasks")

# Bounded traversal: only immediate and one level up
nearby_upstream_ids = task_node.get_flat_relative_ids(upstream=True, depth=2)

Setup/Teardown Dependency Resolution

# When clearing a task, include relevant setup tasks and their teardowns
tasks_to_clear = set()
tasks_to_clear.add(task_node)
for related in task_node.get_upstreams_only_setups_and_teardowns():
    tasks_to_clear.add(related)

Accessing Node Properties

# node_id is abstract and must be implemented by subclasses
print(f"DAG ID: {task_node.dag_id}")
print(f"Label: {task_node.label}")  # Strips task group prefix if applicable
print(f"Has DAG: {task_node.has_dag()}")
print(f"Direct upstream count: {len(task_node.upstream_task_ids)}")
print(f"Direct downstream count: {len(task_node.downstream_task_ids)}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment