Implementation:Apache Airflow BaseStatsLogger Protocol
| Knowledge Sources | |
|---|---|
| Domains | Observability, Metrics |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Defines the StatsLogger protocol interface for metrics emission and provides NoStatsLogger as the no-op fallback implementation used when no metrics backend is configured.
Description
The base_stats_logger.py module establishes the contract that all Airflow stats loggers must satisfy and provides a safe default when no backend is available.
StatsLogger (Protocol)
A type-checking protocol that declares the interface for all stats logging implementations. All methods are class methods, allowing the logger to be used without instantiation. Members include:
instance: StatsLogger | NoStatsLogger | None-- Class-level attribute holding the singleton instance.initialize(*, is_statsd_datadog_enabled, is_statsd_on, is_otel_on, reset_instance=True)-- Initializes the stats logger with backend configuration flags.incr(stat, count=1, rate=1, *, tags=None)-- Increments a counter metric.decr(stat, count=1, rate=1, *, tags=None)-- Decrements a counter metric.gauge(stat, value, rate=1, delta=False, *, tags=None)-- Sets or adjusts a gauge metric.timing(stat, dt, *, tags=None)-- Records a timing metric. AcceptsDeltaType(int, float, ordatetime.timedelta).timer(*args, **kwargs) -> Timer-- Returns aTimercontext manager for measuring code block durations.
NoStatsLogger
A concrete class that implements the same interface as StatsLogger but performs no operations. All metric methods are no-ops except timer(), which returns an empty Timer() instance (so context manager usage does not raise errors). This is the fallback used when no StatsD, DataDog, or OpenTelemetry backend is configured.
Usage
The StatsLogger protocol is used as a type annotation throughout the Airflow codebase. At runtime, the actual implementation (StatsD, DataDog, OpenTelemetry, or NoStatsLogger) is selected during initialization based on configuration.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py(135 lines)
Signature
class StatsLogger(Protocol):
"""This class is only used for TypeChecking (for IDEs, mypy, etc)."""
instance: StatsLogger | NoStatsLogger | None = None
@classmethod
def initialize(
cls, *, is_statsd_datadog_enabled: bool, is_statsd_on: bool,
is_otel_on: bool, reset_instance: bool = True,
) -> None: ...
@classmethod
def incr(cls, stat: str, count: int = 1, rate: int | float = 1,
*, tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def decr(cls, stat: str, count: int = 1, rate: int | float = 1,
*, tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def gauge(cls, stat: str, value: float, rate: int | float = 1,
delta: bool = False, *, tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def timing(cls, stat: str, dt: DeltaType | None,
*, tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def timer(cls, *args, **kwargs) -> Timer: ...
class NoStatsLogger:
"""If no StatsLogger is configured, NoStatsLogger is used as a fallback."""
# Same interface as StatsLogger, all methods are no-ops
...
Import
from airflow_shared.observability.metrics.base_stats_logger import StatsLogger, NoStatsLogger
I/O Contract
| Method | Input | Output | Side Effects |
|---|---|---|---|
initialize |
is_statsd_datadog_enabled: bool, is_statsd_on: bool, is_otel_on: bool, reset_instance: bool |
None |
Configures the stats backend singleton |
incr |
float, tags: dict|None |
None |
Emits counter increment to backend |
decr |
float, tags: dict|None |
None |
Emits counter decrement to backend |
gauge |
float, delta: bool, tags: dict|None |
None |
Emits gauge metric to backend |
timing |
None, tags: dict|None |
None |
Emits timing metric to backend |
timer |
*args, **kwargs |
Timer |
Returns a Timer context manager; NoStatsLogger returns an empty Timer |
Usage Examples
Emitting Metrics via the Protocol
from airflow_shared.observability.metrics.base_stats_logger import StatsLogger
# Increment a counter with tags
StatsLogger.incr("dag_processing.import_errors", count=1, tags={"dag_id": "example_dag"})
# Record a gauge value
StatsLogger.gauge("scheduler.open_slots", value=42)
# Record a timing
import datetime
StatsLogger.timing("task.duration", dt=datetime.timedelta(seconds=5.3), tags={"task_id": "my_task"})
Using the Timer Context Manager
from airflow_shared.observability.metrics.base_stats_logger import StatsLogger
with StatsLogger.timer("scheduler.critical_section_duration") as t:
perform_scheduling()
# Duration is automatically recorded
Fallback Behavior with NoStatsLogger
from airflow_shared.observability.metrics.base_stats_logger import NoStatsLogger
# All calls are safe no-ops
NoStatsLogger.incr("my.metric")
NoStatsLogger.gauge("my.gauge", value=0)
# Timer still works as a context manager (measures duration locally)
with NoStatsLogger.timer() as t:
do_work()