Implementation:Apache Airflow DAG Constructor
| Knowledge Sources | |
|---|---|
| Domains | Workflow_Orchestration, Python_API |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for defining workflow DAGs and tasks provided by the Airflow Task SDK.
Description
The DAG class (attrs-based) and @task decorator (TaskDecoratorCollection) are the primary interfaces for defining Airflow workflows. The DAG class uses the attrs library for attribute definition with automatic __init__ generation, validation, and conversion. The @task decorator provides a functional API that wraps Python functions as Airflow tasks.
Usage
Import DAG and @task when creating any Airflow workflow file. Place DAG files in the configured dags folder for automatic discovery by the DagFileProcessorManager.
Code Reference
Source Location
- Repository: Apache Airflow
- File: task-sdk/src/airflow/sdk/definitions/dag.py
- Lines: L290-1601
Signature
@attrs.define(repr=False, slots=False)
class DAG:
dag_id: str
description: str | None = None
default_args: dict[str, Any] = attrs.field(factory=dict)
start_date: datetime | None = None
end_date: datetime | None = None
schedule: ScheduleArg = None
template_searchpath: str | Iterable[str] | None = None
template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined
user_defined_macros: dict | None = None
user_defined_filters: dict | None = None
max_active_tasks: int = conf.getint("core", "max_active_tasks_per_dag")
max_active_runs: int = conf.getint("core", "max_active_runs_per_dag")
max_consecutive_failed_dag_runs: int = conf.getint("core", "max_consecutive_failed_dag_runs_per_dag")
dagrun_timeout: timedelta | None = None
catchup: bool = conf.getbool("scheduler", "catchup_by_default")
on_success_callback: DagStateChangeCallback | list[DagStateChangeCallback] | None = None
on_failure_callback: DagStateChangeCallback | list[DagStateChangeCallback] | None = None
params: ParamsDict = None
tags: MutableSet[str] = attrs.field(factory=set)
auto_register: bool = True
fail_fast: bool = False
dag_display_name: str = None # defaults to dag_id
@task Decorator:
class TaskDecoratorCollection:
"""Provides the @task syntax."""
def __call__(self, *args, **kwargs):
"""Alias '@task' to @task.python."""
return self.__getattr__("python")(*args, **kwargs)
def __getattr__(self, name: str) -> TaskDecorator:
"""Dynamically get provider-registered task decorators."""
...
task = TaskDecoratorCollection()
Import
from airflow.sdk import DAG
from airflow.sdk import task
# or
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.decorators import task
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| dag_id | str | Yes | Unique identifier for the DAG |
| schedule | ScheduleArg | No | Cron string, timedelta, Timetable, or Asset |
| start_date | datetime | No | First logical date for scheduling |
| default_args | dict | No | Default arguments applied to all tasks |
| tags | set[str] | No | Tags for filtering in the UI |
Outputs
| Name | Type | Description |
|---|---|---|
| DAG object | DAG | Registered DAG with task dependency graph |
| task_dict | dict[str, Operator] | Dictionary of task_id to Operator mappings |
Usage Examples
Basic DAG with TaskFlow API
from airflow.sdk import DAG, task
from datetime import datetime
with DAG(
dag_id="example_dag",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
tags={"example"},
) as dag:
@task
def extract():
return {"data": [1, 2, 3]}
@task
def transform(data: dict):
return {"transformed": [x * 2 for x in data["data"]]}
@task
def load(data: dict):
print(f"Loading: {data}")
raw = extract()
transformed = transform(raw)
load(transformed)