Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Airflow ListenerManager Hooks

From Leeroopedia


Knowledge Sources
Domains Event_Driven, Plugin_System
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for managing event listeners and dispatching lifecycle callbacks provided by the ListenerManager.

Description

The ListenerManager wraps pluggy's PluginManager to provide a hook-based event system. It manages hookspec registration, listener registration, and event dispatch. Hook specs for task instance events define callbacks for on_task_instance_running, on_task_instance_success, and on_task_instance_failed.

Usage

Register listeners via Airflow plugins or airflow_local_settings.py. The ListenerManager is initialized by the Airflow runtime and listeners are automatically discovered from installed providers.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: shared/listeners/src/airflow_shared/listeners/listener.py
  • Lines: L40-78

Signature

class ListenerManager:
    def __init__(self):
        self.pm = pluggy.PluginManager("airflow")
        self.pm.add_hookcall_monitoring(_before_hookcall, _after_hookcall)

    def add_hookspecs(self, spec_module) -> None: ...

    @property
    def has_listeners(self) -> bool: ...

    @property
    def hook(self) -> _HookRelay: ...

    def add_listener(self, listener) -> None: ...

    def clear(self) -> None: ...

Task Instance Hook Specs:

# shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py
@hookspec
def on_task_instance_running(previous_state, task_instance, session): ...

@hookspec
def on_task_instance_success(previous_state, task_instance, session): ...

@hookspec
def on_task_instance_failed(previous_state, task_instance, session): ...

Import

from airflow_shared.listeners.listener import ListenerManager
from airflow_shared.listeners.spec import taskinstance

I/O Contract

Inputs

Name Type Required Description
spec_module module Yes (for setup) Module with @hookspec-decorated functions
listener object Yes (for registration) Object with @hookimpl-decorated methods
previous_state str Yes (for events) Task state before change
task_instance TaskInstance Yes (for events) The affected TaskInstance

Outputs

Name Type Description
Hook calls None Dispatched to all registered listeners
has_listeners bool Whether any listeners are registered

Usage Examples

Custom Listener Plugin

import pluggy

hookimpl = pluggy.HookimplMarker("airflow")

class MyListener:
    @hookimpl
    def on_task_instance_success(self, previous_state, task_instance, session):
        # Send notification on task success
        send_slack_message(f"Task {task_instance.task_id} succeeded!")

    @hookimpl
    def on_task_instance_failed(self, previous_state, task_instance, session):
        # Alert on task failure
        send_pagerduty_alert(f"Task {task_instance.task_id} failed!")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment