Implementation:Apache Airflow GenericDAGNode
| Knowledge Sources | |
|---|---|
| Domains | Core_Infrastructure, DAG_Structure |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Generic base class for nodes in a directed acyclic graph (DAG), providing upstream/downstream relationship tracking, iterative graph traversal algorithms, and setup/teardown-aware dependency resolution.
Description
The GenericDAGNode class is a generic, protocol-based base class that represents a node (task or task group) within a DAG. It is parameterized over three type variables -- Dag, Task, and TaskGroup -- each bound to corresponding protocol interfaces (DagProtocol, TaskProtocol, TaskGroupProtocol).
Key design decisions:
- Iterative traversal: The
get_flat_relative_ids()method uses an iterative breadth-first approach rather than recursion to avoid Python stack overflow on deeply nested DAG structures. It supports an optionaldepthparameter for bounded traversal. - Setup/teardown awareness: Methods like
get_upstreams_follow_setups(),get_upstreams_only_setups_and_teardowns(), andget_upstreams_only_setups()provide specialized traversal logic that understands Airflow's setup/teardown task relationships. - Protocol-based generics: The class uses Protocol types (
DagProtocol,TaskProtocol,TaskGroupProtocol) rather than concrete class dependencies, enabling use in both the core Airflow runtime and the Task SDK without circular imports. - Structlog integration: The
logproperty lazily creates a structlog logger named after the concrete class.
Core attributes managed per node:
upstream_task_ids: set[str]-- IDs of direct upstream tasksdownstream_task_ids: set[str]-- IDs of direct downstream tasksdag: Dag | None-- Reference to the parent DAGtask_group: TaskGroup | None-- Reference to the containing task group
Note: The set_downstream, set_upstream, and bitshift operators (>>, <<) are implemented in subclasses within airflow-core, not in this shared base class.
Usage
from airflow_shared.dagnode.node import GenericDAGNode
# GenericDAGNode is typically subclassed by concrete task implementations
# in airflow-core and the Task SDK. Direct usage is uncommon.
# Example: traversing upstream dependencies
for task in my_task_node.upstream_list:
print(f"Upstream: {task.task_id}")
# Get all transitive downstream task IDs (iterative, not recursive)
all_downstream = my_task_node.get_flat_relative_ids(upstream=False)
# Bounded traversal: only 2 levels deep
nearby = my_task_node.get_flat_relative_ids(upstream=True, depth=2)
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/dagnode/src/airflow_shared/dagnode/node.py
Protocols
class DagProtocol(Protocol):
"""Protocol defining the minimum interface required for Dag generic type."""
dag_id: str
task_dict: dict[str, Any]
def get_task(self, tid: str) -> Any: ...
class TaskProtocol(Protocol):
"""Protocol defining the minimum interface required for Task generic type."""
task_id: str
is_setup: bool
is_teardown: bool
downstream_list: Iterable[Self]
downstream_task_ids: set[str]
class TaskGroupProtocol(Protocol):
"""Protocol defining the minimum interface required for TaskGroup generic type."""
node_id: str
prefix_group_id: bool
Key Class: GenericDAGNode (lines 70-265)
class GenericDAGNode(Generic[Dag, Task, TaskGroup]):
"""Generic class for a node in the graph of a workflow."""
dag: Dag | None
task_group: TaskGroup | None
upstream_task_ids: set[str]
downstream_task_ids: set[str]
def __init__(self): ...
# Properties
@property
def log(self) -> Logger: ...
@property
def dag_id(self) -> str: ...
@property
def node_id(self) -> str: ... # abstract -- raises NotImplementedError
@property
def label(self) -> str | None: ...
@property
def upstream_list(self) -> Iterable[Task]: ...
@property
def downstream_list(self) -> Iterable[Task]: ...
# Relationship queries
def has_dag(self) -> bool: ...
def get_dag(self) -> Dag | None: ...
def get_direct_relative_ids(self, upstream: bool = False) -> set[str]: ...
def get_direct_relatives(self, upstream: bool = False) -> Iterable[Task]: ...
# Traversal (iterative, not recursive)
def get_flat_relative_ids(
self, *, upstream: bool = False, depth: int | None = None,
) -> set[str]: ...
def get_flat_relatives(
self, upstream: bool = False, depth: int | None = None,
) -> Collection[Task]: ...
# Setup/teardown aware traversal
def get_upstreams_follow_setups(self, depth: int | None = None) -> Iterable[Task]: ...
def get_upstreams_only_setups_and_teardowns(self) -> Iterable[Task]: ...
def get_upstreams_only_setups(self) -> Iterable[Task]: ...
Import
from airflow_shared.dagnode.node import GenericDAGNode
from airflow_shared.dagnode.node import DagProtocol, TaskProtocol, TaskGroupProtocol
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| upstream | bool | No | Direction flag: True for upstream, False for downstream (default)
|
| depth | int or None | No | Maximum traversal depth; None means unbounded traversal
|
Outputs
| Name | Type | Description |
|---|---|---|
| upstream_list | Iterable[Task] | Direct upstream task objects (resolved from upstream_task_ids via DAG)
|
| downstream_list | Iterable[Task] | Direct downstream task objects (resolved from downstream_task_ids via DAG)
|
| flat_relative_ids | set[str] | All transitive relative task IDs (iterative BFS) |
| flat_relatives | Collection[Task] | All transitive relative task objects |
| setup/teardown iterables | Iterable[Task] | Filtered task sets respecting setup/teardown relationships |
Usage Examples
Iterative Graph Traversal
# Get all transitive upstream task IDs without recursion
all_upstream_ids = task_node.get_flat_relative_ids(upstream=True)
print(f"Task depends on {len(all_upstream_ids)} upstream tasks")
# Bounded traversal: only immediate and one level up
nearby_upstream_ids = task_node.get_flat_relative_ids(upstream=True, depth=2)
Setup/Teardown Dependency Resolution
# When clearing a task, include relevant setup tasks and their teardowns
tasks_to_clear = set()
tasks_to_clear.add(task_node)
for related in task_node.get_upstreams_only_setups_and_teardowns():
tasks_to_clear.add(related)
Accessing Node Properties
# node_id is abstract and must be implemented by subclasses
print(f"DAG ID: {task_node.dag_id}")
print(f"Label: {task_node.label}") # Strips task group prefix if applicable
print(f"Has DAG: {task_node.has_dag()}")
print(f"Direct upstream count: {len(task_node.upstream_task_ids)}")
print(f"Direct downstream count: {len(task_node.downstream_task_ids)}")