Implementation:Apache Airflow Lifecycle Listener Spec
| Knowledge Sources | |
|---|---|
| Domains | Event_System, Plugin_Architecture |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Defines the pluggy hookspec interface for Airflow component lifecycle events, providing two hooks -- on_starting and before_stopping -- that listeners can implement to react to component startup and shutdown.
Description
The lifecycle hookspec module defines the contract for the Airflow component lifecycle event system. It uses the pluggy library's HookspecMarker to declare two hook specifications:
- on_starting(component) -- Called before an Airflow component (scheduler, worker, task runner, etc.) starts. It is guaranteed to be invoked before any other plugin method, making it suitable for initialization logic such as establishing connections, registering metrics, or loading configuration.
- before_stopping(component) -- Called before an Airflow component stops. It is guaranteed to be invoked after all other plugin methods, making it suitable for cleanup logic such as flushing buffers, closing connections, or emitting final metrics.
Both hooks receive a component parameter representing the Airflow component instance that is starting or stopping.
Listeners implement these hooks by defining functions with matching signatures and registering them through Airflow's pluggy-based listener system.
Usage
To implement a lifecycle listener, define a module with functions matching the hookspec signatures:
# my_lifecycle_listener.py
def on_starting(component):
"""Called when an Airflow component is starting up."""
print(f"Component starting: {component}")
# Initialize resources, open connections, etc.
def before_stopping(component):
"""Called when an Airflow component is about to stop."""
print(f"Component stopping: {component}")
# Cleanup resources, flush buffers, close connections, etc.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/listeners/src/airflow_shared/listeners/spec/lifecycle.py
Signature
from pluggy import HookspecMarker
hookspec = HookspecMarker("airflow")
@hookspec
def on_starting(component):
"""
Execute before Airflow component - jobs like scheduler, worker,
or task runner starts.
It's guaranteed this will be called before any other plugin method.
:param component: Component that calls this method
"""
@hookspec
def before_stopping(component):
"""
Execute before Airflow component - jobs like scheduler, worker,
or task runner stops.
It's guaranteed this will be called after any other plugin method.
:param component: Component that calls this method
"""
Import
from airflow_shared.listeners.spec.lifecycle import on_starting, before_stopping
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| component | object | Yes | The Airflow component instance (scheduler, worker, task runner, etc.) that is starting or stopping |
Outputs
| Name | Type | Description |
|---|---|---|
| None | None | These hooks do not return values; they are called for side effects only |
Hook Execution Order
| Phase | Hook | Guarantee |
|---|---|---|
| Startup | on_starting(component) |
Called before any other plugin method |
| Shutdown | before_stopping(component) |
Called after all other plugin methods |
Usage Examples
Basic Lifecycle Listener
# my_plugin/listeners/lifecycle_listener.py
import logging
log = logging.getLogger(__name__)
def on_starting(component):
log.info("Airflow component %s is starting", type(component).__name__)
# Example: initialize a connection pool
component._custom_pool = create_connection_pool()
def before_stopping(component):
log.info("Airflow component %s is stopping", type(component).__name__)
# Example: close the connection pool
if hasattr(component, "_custom_pool"):
component._custom_pool.close()
Registering a Listener via Plugin
from airflow.plugins_manager import AirflowPlugin
from my_plugin.listeners import lifecycle_listener
class MyPlugin(AirflowPlugin):
name = "my_lifecycle_plugin"
listeners = [lifecycle_listener]