Implementation:Apache Airflow ListenerManager Hooks
| Knowledge Sources | |
|---|---|
| Domains | Event_Driven, Plugin_System |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for managing event listeners and dispatching lifecycle callbacks provided by the ListenerManager.
Description
The ListenerManager wraps pluggy's PluginManager to provide a hook-based event system. It manages hookspec registration, listener registration, and event dispatch. Hook specs for task instance events define callbacks for on_task_instance_running, on_task_instance_success, and on_task_instance_failed.
Usage
Register listeners via Airflow plugins or airflow_local_settings.py. The ListenerManager is initialized by the Airflow runtime and listeners are automatically discovered from installed providers.
Code Reference
Source Location
- Repository: Apache Airflow
- File: shared/listeners/src/airflow_shared/listeners/listener.py
- Lines: L40-78
Signature
class ListenerManager:
def __init__(self):
self.pm = pluggy.PluginManager("airflow")
self.pm.add_hookcall_monitoring(_before_hookcall, _after_hookcall)
def add_hookspecs(self, spec_module) -> None: ...
@property
def has_listeners(self) -> bool: ...
@property
def hook(self) -> _HookRelay: ...
def add_listener(self, listener) -> None: ...
def clear(self) -> None: ...
Task Instance Hook Specs:
# shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py
@hookspec
def on_task_instance_running(previous_state, task_instance, session): ...
@hookspec
def on_task_instance_success(previous_state, task_instance, session): ...
@hookspec
def on_task_instance_failed(previous_state, task_instance, session): ...
Import
from airflow_shared.listeners.listener import ListenerManager
from airflow_shared.listeners.spec import taskinstance
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| spec_module | module | Yes (for setup) | Module with @hookspec-decorated functions |
| listener | object | Yes (for registration) | Object with @hookimpl-decorated methods |
| previous_state | str | Yes (for events) | Task state before change |
| task_instance | TaskInstance | Yes (for events) | The affected TaskInstance |
Outputs
| Name | Type | Description |
|---|---|---|
| Hook calls | None | Dispatched to all registered listeners |
| has_listeners | bool | Whether any listeners are registered |
Usage Examples
Custom Listener Plugin
import pluggy
hookimpl = pluggy.HookimplMarker("airflow")
class MyListener:
@hookimpl
def on_task_instance_success(self, previous_state, task_instance, session):
# Send notification on task success
send_slack_message(f"Task {task_instance.task_id} succeeded!")
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, session):
# Alert on task failure
send_pagerduty_alert(f"Task {task_instance.task_id} failed!")