Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow TaskInstance Listener Spec

From Leeroopedia


Knowledge Sources
Domains Event_System, Task_Lifecycle
Last Updated 2026-02-08 21:00 GMT

Overview

Defines the pluggy hookspec interface for task instance state change events, providing four hooks -- on_task_instance_running, on_task_instance_success, on_task_instance_failed, and on_task_instance_skipped -- that listeners can implement to react to task lifecycle transitions.

Description

The taskinstance hookspec module defines the contract for Airflow's task-level event system. It uses pluggy's HookspecMarker to declare four hook specifications that fire when a task instance transitions between states:

  • on_task_instance_running(previous_state, task_instance) -- Called when a task instance transitions to the RUNNING state. The previous_state can be None (first run) or a TaskInstanceState value.
  • on_task_instance_success(previous_state, task_instance) -- Called when a task instance transitions to the SUCCESS state.
  • on_task_instance_failed(previous_state, task_instance, error) -- Called when a task instance transitions to the FAILED state. The error parameter contains the failure cause (can be None, a string, or a BaseException).
  • on_task_instance_skipped(previous_state, task_instance) -- Called when a task instance intentionally skips itself during execution (e.g., by raising AirflowSkipException). This hook does not fire for tasks skipped by the scheduler before execution began (e.g., due to trigger rules, BranchPythonOperator, or ShortCircuitOperator).

The task_instance parameter accepts either a RuntimeTaskInstance (when called from the task execution context) or a TaskInstance (when called from the API server).

Usage

# my_task_listener.py

def on_task_instance_running(previous_state, task_instance):
    print(f"Task {task_instance.task_id} is now RUNNING (was {previous_state})")

def on_task_instance_success(previous_state, task_instance):
    print(f"Task {task_instance.task_id} succeeded")

def on_task_instance_failed(previous_state, task_instance, error):
    print(f"Task {task_instance.task_id} failed: {error}")

def on_task_instance_skipped(previous_state, task_instance):
    print(f"Task {task_instance.task_id} was skipped during execution")

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py

Signature

from __future__ import annotations

from typing import TYPE_CHECKING

from pluggy import HookspecMarker

if TYPE_CHECKING:
    from airflow.models.taskinstance import TaskInstance
    from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
    from airflow.utils.state import TaskInstanceState

hookspec = HookspecMarker("airflow")


@hookspec
def on_task_instance_running(
    previous_state: TaskInstanceState | None,
    task_instance: RuntimeTaskInstance | TaskInstance,
):
    """Execute when task state changes to RUNNING. previous_state can be None."""


@hookspec
def on_task_instance_success(
    previous_state: TaskInstanceState | None,
    task_instance: RuntimeTaskInstance | TaskInstance,
):
    """Execute when task state changes to SUCCESS. previous_state can be None."""


@hookspec
def on_task_instance_failed(
    previous_state: TaskInstanceState | None,
    task_instance: RuntimeTaskInstance | TaskInstance,
    error: None | str | BaseException,
):
    """Execute when task state changes to FAIL. previous_state can be None."""


@hookspec
def on_task_instance_skipped(
    previous_state: TaskInstanceState | None,
    task_instance: RuntimeTaskInstance | TaskInstance,
):
    """Execute when a task instance skips itself during execution."""

Import

from airflow_shared.listeners.spec.taskinstance import (
    on_task_instance_running,
    on_task_instance_success,
    on_task_instance_failed,
    on_task_instance_skipped,
)

I/O Contract

Inputs

Name Type Required Description
previous_state TaskInstanceState or None Yes The state the task instance was in before the transition; None if this is the first state
task_instance RuntimeTaskInstance or TaskInstance Yes The task instance object. Type depends on calling context: RuntimeTaskInstance from task execution, TaskInstance from the API server
error None, str, or BaseException Yes (failed only) The error that caused the failure; only present on on_task_instance_failed

Outputs

Name Type Description
None None These hooks do not return values; they are called for side effects only

Hook Execution Matrix

Hook Trigger State Has Error Param Covers Scheduler Skips
on_task_instance_running RUNNING No N/A
on_task_instance_success SUCCESS No N/A
on_task_instance_failed FAILED Yes N/A
on_task_instance_skipped SKIPPED No No (execution-time skips only)

Usage Examples

Sending Notifications on Task Failure

# my_plugin/listeners/task_notifications.py
import logging

log = logging.getLogger(__name__)

def on_task_instance_failed(previous_state, task_instance, error):
    """Send a notification when any task fails."""
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    log.error("ALERT: Task %s.%s failed with error: %s", dag_id, task_id, error)
    send_slack_notification(
        channel="#airflow-alerts",
        message=f"Task {dag_id}.{task_id} failed: {error}",
    )

Tracking Task Duration Metrics

# my_plugin/listeners/task_metrics.py
import time

_task_start_times = {}

def on_task_instance_running(previous_state, task_instance):
    """Record task start time."""
    key = f"{task_instance.dag_id}.{task_instance.task_id}"
    _task_start_times[key] = time.monotonic()

def on_task_instance_success(previous_state, task_instance):
    """Emit duration metric on success."""
    key = f"{task_instance.dag_id}.{task_instance.task_id}"
    if key in _task_start_times:
        duration = time.monotonic() - _task_start_times.pop(key)
        emit_metric("task.custom_duration", duration, tags={"dag_id": task_instance.dag_id})

Registering a Task Listener

from airflow.plugins_manager import AirflowPlugin
from my_plugin.listeners import task_notifications

class MyTaskListenerPlugin(AirflowPlugin):
    name = "my_task_listener"
    listeners = [task_notifications]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment