Implementation:Apache Airflow TaskInstance Listener Spec
| Knowledge Sources | |
|---|---|
| Domains | Event_System, Task_Lifecycle |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Defines the pluggy hookspec interface for task instance state change events, providing four hooks -- on_task_instance_running, on_task_instance_success, on_task_instance_failed, and on_task_instance_skipped -- that listeners can implement to react to task lifecycle transitions.
Description
The taskinstance hookspec module defines the contract for Airflow's task-level event system. It uses pluggy's HookspecMarker to declare four hook specifications that fire when a task instance transitions between states:
- on_task_instance_running(previous_state, task_instance) -- Called when a task instance transitions to the
RUNNINGstate. Theprevious_statecan beNone(first run) or aTaskInstanceStatevalue.
- on_task_instance_success(previous_state, task_instance) -- Called when a task instance transitions to the
SUCCESSstate.
- on_task_instance_failed(previous_state, task_instance, error) -- Called when a task instance transitions to the
FAILEDstate. Theerrorparameter contains the failure cause (can beNone, a string, or aBaseException).
- on_task_instance_skipped(previous_state, task_instance) -- Called when a task instance intentionally skips itself during execution (e.g., by raising
AirflowSkipException). This hook does not fire for tasks skipped by the scheduler before execution began (e.g., due to trigger rules, BranchPythonOperator, or ShortCircuitOperator).
The task_instance parameter accepts either a RuntimeTaskInstance (when called from the task execution context) or a TaskInstance (when called from the API server).
Usage
# my_task_listener.py
def on_task_instance_running(previous_state, task_instance):
print(f"Task {task_instance.task_id} is now RUNNING (was {previous_state})")
def on_task_instance_success(previous_state, task_instance):
print(f"Task {task_instance.task_id} succeeded")
def on_task_instance_failed(previous_state, task_instance, error):
print(f"Task {task_instance.task_id} failed: {error}")
def on_task_instance_skipped(previous_state, task_instance):
print(f"Task {task_instance.task_id} was skipped during execution")
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py
Signature
from __future__ import annotations
from typing import TYPE_CHECKING
from pluggy import HookspecMarker
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.utils.state import TaskInstanceState
hookspec = HookspecMarker("airflow")
@hookspec
def on_task_instance_running(
previous_state: TaskInstanceState | None,
task_instance: RuntimeTaskInstance | TaskInstance,
):
"""Execute when task state changes to RUNNING. previous_state can be None."""
@hookspec
def on_task_instance_success(
previous_state: TaskInstanceState | None,
task_instance: RuntimeTaskInstance | TaskInstance,
):
"""Execute when task state changes to SUCCESS. previous_state can be None."""
@hookspec
def on_task_instance_failed(
previous_state: TaskInstanceState | None,
task_instance: RuntimeTaskInstance | TaskInstance,
error: None | str | BaseException,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
@hookspec
def on_task_instance_skipped(
previous_state: TaskInstanceState | None,
task_instance: RuntimeTaskInstance | TaskInstance,
):
"""Execute when a task instance skips itself during execution."""
Import
from airflow_shared.listeners.spec.taskinstance import (
on_task_instance_running,
on_task_instance_success,
on_task_instance_failed,
on_task_instance_skipped,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| previous_state | TaskInstanceState or None | Yes | The state the task instance was in before the transition; None if this is the first state
|
| task_instance | RuntimeTaskInstance or TaskInstance | Yes | The task instance object. Type depends on calling context: RuntimeTaskInstance from task execution, TaskInstance from the API server
|
| error | None, str, or BaseException | Yes (failed only) | The error that caused the failure; only present on on_task_instance_failed
|
Outputs
| Name | Type | Description |
|---|---|---|
| None | None | These hooks do not return values; they are called for side effects only |
Hook Execution Matrix
| Hook | Trigger State | Has Error Param | Covers Scheduler Skips |
|---|---|---|---|
on_task_instance_running |
RUNNING | No | N/A |
on_task_instance_success |
SUCCESS | No | N/A |
on_task_instance_failed |
FAILED | Yes | N/A |
on_task_instance_skipped |
SKIPPED | No | No (execution-time skips only) |
Usage Examples
Sending Notifications on Task Failure
# my_plugin/listeners/task_notifications.py
import logging
log = logging.getLogger(__name__)
def on_task_instance_failed(previous_state, task_instance, error):
"""Send a notification when any task fails."""
dag_id = task_instance.dag_id
task_id = task_instance.task_id
log.error("ALERT: Task %s.%s failed with error: %s", dag_id, task_id, error)
send_slack_notification(
channel="#airflow-alerts",
message=f"Task {dag_id}.{task_id} failed: {error}",
)
Tracking Task Duration Metrics
# my_plugin/listeners/task_metrics.py
import time
_task_start_times = {}
def on_task_instance_running(previous_state, task_instance):
"""Record task start time."""
key = f"{task_instance.dag_id}.{task_instance.task_id}"
_task_start_times[key] = time.monotonic()
def on_task_instance_success(previous_state, task_instance):
"""Emit duration metric on success."""
key = f"{task_instance.dag_id}.{task_instance.task_id}"
if key in _task_start_times:
duration = time.monotonic() - _task_start_times.pop(key)
emit_metric("task.custom_duration", duration, tags={"dag_id": task_instance.dag_id})
Registering a Task Listener
from airflow.plugins_manager import AirflowPlugin
from my_plugin.listeners import task_notifications
class MyTaskListenerPlugin(AirflowPlugin):
name = "my_task_listener"
listeners = [task_notifications]