Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Airflow DAG Constructor

From Leeroopedia


Knowledge Sources
Domains Workflow_Orchestration, Python_API
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for defining workflow DAGs and tasks provided by the Airflow Task SDK.

Description

The DAG class (attrs-based) and @task decorator (TaskDecoratorCollection) are the primary interfaces for defining Airflow workflows. The DAG class uses the attrs library for attribute definition with automatic __init__ generation, validation, and conversion. The @task decorator provides a functional API that wraps Python functions as Airflow tasks.

Usage

Import DAG and @task when creating any Airflow workflow file. Place DAG files in the configured dags folder for automatic discovery by the DagFileProcessorManager.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: task-sdk/src/airflow/sdk/definitions/dag.py
  • Lines: L290-1601

Signature

@attrs.define(repr=False, slots=False)
class DAG:
    dag_id: str
    description: str | None = None
    default_args: dict[str, Any] = attrs.field(factory=dict)
    start_date: datetime | None = None
    end_date: datetime | None = None
    schedule: ScheduleArg = None
    template_searchpath: str | Iterable[str] | None = None
    template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined
    user_defined_macros: dict | None = None
    user_defined_filters: dict | None = None
    max_active_tasks: int = conf.getint("core", "max_active_tasks_per_dag")
    max_active_runs: int = conf.getint("core", "max_active_runs_per_dag")
    max_consecutive_failed_dag_runs: int = conf.getint("core", "max_consecutive_failed_dag_runs_per_dag")
    dagrun_timeout: timedelta | None = None
    catchup: bool = conf.getbool("scheduler", "catchup_by_default")
    on_success_callback: DagStateChangeCallback | list[DagStateChangeCallback] | None = None
    on_failure_callback: DagStateChangeCallback | list[DagStateChangeCallback] | None = None
    params: ParamsDict = None
    tags: MutableSet[str] = attrs.field(factory=set)
    auto_register: bool = True
    fail_fast: bool = False
    dag_display_name: str = None  # defaults to dag_id

@task Decorator:

class TaskDecoratorCollection:
    """Provides the @task syntax."""

    def __call__(self, *args, **kwargs):
        """Alias '@task' to @task.python."""
        return self.__getattr__("python")(*args, **kwargs)

    def __getattr__(self, name: str) -> TaskDecorator:
        """Dynamically get provider-registered task decorators."""
        ...

task = TaskDecoratorCollection()

Import

from airflow.sdk import DAG
from airflow.sdk import task
# or
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.decorators import task

I/O Contract

Inputs

Name Type Required Description
dag_id str Yes Unique identifier for the DAG
schedule ScheduleArg No Cron string, timedelta, Timetable, or Asset
start_date datetime No First logical date for scheduling
default_args dict No Default arguments applied to all tasks
tags set[str] No Tags for filtering in the UI

Outputs

Name Type Description
DAG object DAG Registered DAG with task dependency graph
task_dict dict[str, Operator] Dictionary of task_id to Operator mappings

Usage Examples

Basic DAG with TaskFlow API

from airflow.sdk import DAG, task
from datetime import datetime

with DAG(
    dag_id="example_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    tags={"example"},
) as dag:

    @task
    def extract():
        return {"data": [1, 2, 3]}

    @task
    def transform(data: dict):
        return {"transformed": [x * 2 for x in data["data"]]}

    @task
    def load(data: dict):
        print(f"Loading: {data}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment