Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:Apache Airflow Task Idempotency Pattern

From Leeroopedia




Knowledge Sources
Domains Data_Integrity, Reliability
Last Updated 2026-02-08 20:00 GMT

Overview

Treat each Airflow task as a database transaction — use UPSERT instead of INSERT, read from specific partitions, and never use `datetime.now()` for data logic.

Description

Airflow tasks may be retried or re-run at any time (due to failures, backfills, or manual triggers). A non-idempotent task produces different results on re-execution, leading to duplicate data, corrupted state, or inconsistent outputs. The core principle is that a task should produce the same outcome on every re-run — equivalent to a database transaction that can be safely replayed.

Usage

Apply this heuristic to every task that writes data — database inserts, file writes, API calls with side effects, or any operation that modifies external state. This is especially critical for tasks in production DAGs that process financial data, user records, or any data where duplicates or inconsistencies cause business impact.

The Insight (Rule of Thumb)

  • Action 1: Use UPSERT (INSERT ... ON CONFLICT UPDATE) instead of INSERT to prevent duplicate rows on re-runs.
  • Action 2: Read data from specific partitions using `data_interval_start` / `data_interval_end`, never from "latest" or "current" data.
  • Action 3: Never use `datetime.now()` inside task logic for data processing — use `Template:Data interval start` or `Template:Data interval end` instead.
  • Action 4: Store task configuration in `default_args` to avoid typos and ensure consistency.
  • Trade-off: UPSERT requires a unique constraint or primary key on the target table. Partition-based reads may require schema design changes.

BAD — non-idempotent:

def load_data():
    # Creates duplicates on every re-run
    db.execute("INSERT INTO table VALUES (...)")
    # Reads different data on different days
    data = db.query("SELECT * FROM source WHERE date = CURRENT_DATE")

GOOD — idempotent:

def load_data(**context):
    interval_start = context["data_interval_start"]
    # Partition-specific read
    data = db.query(f"SELECT * FROM source WHERE date = '{interval_start}'")
    # UPSERT prevents duplicates
    db.execute("INSERT INTO table VALUES (...) ON CONFLICT (id) DO UPDATE SET ...")

Reasoning

Evidence from `airflow-core/docs/best-practices.rst:44-66`:

The documentation states: "An Airflow task should be treated as an equivalent to a transaction in a database." This is because:

  1. Retries: Tasks retry on failure (default `retries=0`, but commonly configured higher). A non-idempotent INSERT creates duplicates on each retry.
  2. Backfills: Running `airflow dags backfill` re-executes tasks for historical intervals. Non-idempotent tasks corrupt historical data.
  3. Manual triggers: Users can re-run tasks from the UI at any time.

Additionally, the documentation warns against `datetime.now()` in tasks because:

  • A task scheduled for 2024-01-15 might execute on 2024-01-16 (due to queue delays)
  • Using `datetime.now()` would process the wrong date's data
  • Using `data_interval_start` always processes the correct date regardless of when execution occurs

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment