Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow BaseStatsLogger Protocol

From Leeroopedia
Revision as of 14:10, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Airflow_BaseStatsLogger_Protocol.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Observability, Metrics
Last Updated 2026-02-08 21:00 GMT

Overview

Defines the StatsLogger protocol interface for metrics emission and provides NoStatsLogger as the no-op fallback implementation used when no metrics backend is configured.

Description

The base_stats_logger.py module establishes the contract that all Airflow stats loggers must satisfy and provides a safe default when no backend is available.

StatsLogger (Protocol)

A type-checking protocol that declares the interface for all stats logging implementations. All methods are class methods, allowing the logger to be used without instantiation. Members include:

  • instance: StatsLogger | NoStatsLogger | None -- Class-level attribute holding the singleton instance.
  • initialize(*, is_statsd_datadog_enabled, is_statsd_on, is_otel_on, reset_instance=True) -- Initializes the stats logger with backend configuration flags.
  • incr(stat, count=1, rate=1, *, tags=None) -- Increments a counter metric.
  • decr(stat, count=1, rate=1, *, tags=None) -- Decrements a counter metric.
  • gauge(stat, value, rate=1, delta=False, *, tags=None) -- Sets or adjusts a gauge metric.
  • timing(stat, dt, *, tags=None) -- Records a timing metric. Accepts DeltaType (int, float, or datetime.timedelta).
  • timer(*args, **kwargs) -> Timer -- Returns a Timer context manager for measuring code block durations.

NoStatsLogger

A concrete class that implements the same interface as StatsLogger but performs no operations. All metric methods are no-ops except timer(), which returns an empty Timer() instance (so context manager usage does not raise errors). This is the fallback used when no StatsD, DataDog, or OpenTelemetry backend is configured.

Usage

The StatsLogger protocol is used as a type annotation throughout the Airflow codebase. At runtime, the actual implementation (StatsD, DataDog, OpenTelemetry, or NoStatsLogger) is selected during initialization based on configuration.

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py (135 lines)

Signature

class StatsLogger(Protocol):
    """This class is only used for TypeChecking (for IDEs, mypy, etc)."""

    instance: StatsLogger | NoStatsLogger | None = None

    @classmethod
    def initialize(
        cls, *, is_statsd_datadog_enabled: bool, is_statsd_on: bool,
        is_otel_on: bool, reset_instance: bool = True,
    ) -> None: ...

    @classmethod
    def incr(cls, stat: str, count: int = 1, rate: int | float = 1,
             *, tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def decr(cls, stat: str, count: int = 1, rate: int | float = 1,
             *, tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def gauge(cls, stat: str, value: float, rate: int | float = 1,
              delta: bool = False, *, tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def timing(cls, stat: str, dt: DeltaType | None,
               *, tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def timer(cls, *args, **kwargs) -> Timer: ...


class NoStatsLogger:
    """If no StatsLogger is configured, NoStatsLogger is used as a fallback."""
    # Same interface as StatsLogger, all methods are no-ops
    ...

Import

from airflow_shared.observability.metrics.base_stats_logger import StatsLogger, NoStatsLogger

I/O Contract

Method Input Output Side Effects
initialize is_statsd_datadog_enabled: bool, is_statsd_on: bool, is_otel_on: bool, reset_instance: bool None Configures the stats backend singleton
incr float, tags: dict|None None Emits counter increment to backend
decr float, tags: dict|None None Emits counter decrement to backend
gauge float, delta: bool, tags: dict|None None Emits gauge metric to backend
timing None, tags: dict|None None Emits timing metric to backend
timer *args, **kwargs Timer Returns a Timer context manager; NoStatsLogger returns an empty Timer

Usage Examples

Emitting Metrics via the Protocol

from airflow_shared.observability.metrics.base_stats_logger import StatsLogger

# Increment a counter with tags
StatsLogger.incr("dag_processing.import_errors", count=1, tags={"dag_id": "example_dag"})

# Record a gauge value
StatsLogger.gauge("scheduler.open_slots", value=42)

# Record a timing
import datetime
StatsLogger.timing("task.duration", dt=datetime.timedelta(seconds=5.3), tags={"task_id": "my_task"})

Using the Timer Context Manager

from airflow_shared.observability.metrics.base_stats_logger import StatsLogger

with StatsLogger.timer("scheduler.critical_section_duration") as t:
    perform_scheduling()
# Duration is automatically recorded

Fallback Behavior with NoStatsLogger

from airflow_shared.observability.metrics.base_stats_logger import NoStatsLogger

# All calls are safe no-ops
NoStatsLogger.incr("my.metric")
NoStatsLogger.gauge("my.gauge", value=0)

# Timer still works as a context manager (measures duration locally)
with NoStatsLogger.timer() as t:
    do_work()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment