Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow Lifecycle Listener Spec

From Leeroopedia


Knowledge Sources
Domains Event_System, Plugin_Architecture
Last Updated 2026-02-08 21:00 GMT

Overview

Defines the pluggy hookspec interface for Airflow component lifecycle events, providing two hooks -- on_starting and before_stopping -- that listeners can implement to react to component startup and shutdown.

Description

The lifecycle hookspec module defines the contract for the Airflow component lifecycle event system. It uses the pluggy library's HookspecMarker to declare two hook specifications:

  • on_starting(component) -- Called before an Airflow component (scheduler, worker, task runner, etc.) starts. It is guaranteed to be invoked before any other plugin method, making it suitable for initialization logic such as establishing connections, registering metrics, or loading configuration.
  • before_stopping(component) -- Called before an Airflow component stops. It is guaranteed to be invoked after all other plugin methods, making it suitable for cleanup logic such as flushing buffers, closing connections, or emitting final metrics.

Both hooks receive a component parameter representing the Airflow component instance that is starting or stopping.

Listeners implement these hooks by defining functions with matching signatures and registering them through Airflow's pluggy-based listener system.

Usage

To implement a lifecycle listener, define a module with functions matching the hookspec signatures:

# my_lifecycle_listener.py

def on_starting(component):
    """Called when an Airflow component is starting up."""
    print(f"Component starting: {component}")
    # Initialize resources, open connections, etc.

def before_stopping(component):
    """Called when an Airflow component is about to stop."""
    print(f"Component stopping: {component}")
    # Cleanup resources, flush buffers, close connections, etc.

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/listeners/src/airflow_shared/listeners/spec/lifecycle.py

Signature

from pluggy import HookspecMarker

hookspec = HookspecMarker("airflow")


@hookspec
def on_starting(component):
    """
    Execute before Airflow component - jobs like scheduler, worker,
    or task runner starts.

    It's guaranteed this will be called before any other plugin method.

    :param component: Component that calls this method
    """


@hookspec
def before_stopping(component):
    """
    Execute before Airflow component - jobs like scheduler, worker,
    or task runner stops.

    It's guaranteed this will be called after any other plugin method.

    :param component: Component that calls this method
    """

Import

from airflow_shared.listeners.spec.lifecycle import on_starting, before_stopping

I/O Contract

Inputs

Name Type Required Description
component object Yes The Airflow component instance (scheduler, worker, task runner, etc.) that is starting or stopping

Outputs

Name Type Description
None None These hooks do not return values; they are called for side effects only

Hook Execution Order

Phase Hook Guarantee
Startup on_starting(component) Called before any other plugin method
Shutdown before_stopping(component) Called after all other plugin methods

Usage Examples

Basic Lifecycle Listener

# my_plugin/listeners/lifecycle_listener.py
import logging

log = logging.getLogger(__name__)

def on_starting(component):
    log.info("Airflow component %s is starting", type(component).__name__)
    # Example: initialize a connection pool
    component._custom_pool = create_connection_pool()

def before_stopping(component):
    log.info("Airflow component %s is stopping", type(component).__name__)
    # Example: close the connection pool
    if hasattr(component, "_custom_pool"):
        component._custom_pool.close()

Registering a Listener via Plugin

from airflow.plugins_manager import AirflowPlugin
from my_plugin.listeners import lifecycle_listener

class MyPlugin(AirflowPlugin):
    name = "my_lifecycle_plugin"
    listeners = [lifecycle_listener]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment