Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow DualStatsManager

From Leeroopedia


Knowledge Sources
Domains Observability, Metrics, Migration
Last Updated 2026-02-08 21:00 GMT

Overview

Manages dual emission of legacy and new metric names during the Airflow metrics naming migration, allowing both old and new metric name formats to be emitted simultaneously via a single API.

Description

The DualStatsManager class is a helper that abstracts the process of emitting metrics with both legacy and new naming conventions during Airflow's metrics migration period. It decorates the underlying Stats logger to emit each metric call twice -- once with the legacy name format and once with the new name format -- so that dashboards and alerts can be migrated incrementally.

Core Design

  • metrics_dict -- A class-level MetricsRegistry instance that loads metric definitions from metrics_template.yaml. This registry maps new metric names to their legacy equivalents and required variables.
  • export_legacy_names: ClassVar[bool] -- A class variable (default True) that controls whether legacy metric names are emitted alongside new names.
  • initialize(export_legacy_names) -- Sets the export_legacy_names flag.

Metric Methods

Each metric method (incr, decr, gauge, timing, timer) follows the same pattern:

  1. Accepts standard metric parameters plus an additional extra_tags keyword argument containing variables for legacy name interpolation.
  2. If export_legacy_names is True and extra_tags is provided:
    1. Looks up the legacy name from the registry via get_legacy_stat(stat, variables).
    2. Emits the metric with the legacy name (using only the base tags, not extra_tags).
    3. Raises ValueError if the stat is not in the registry or has no legacy name.
  3. Always emits the metric with the new name, merging extra_tags into tags.

Legacy Name Resolution

get_legacy_stat(stat, variables) performs the following:

  1. Looks up the stat in the MetricsRegistry. Raises ValueError if not found.
  2. Retrieves the legacy_name field. Returns None if set to "-" (no legacy equivalent).
  3. Validates that all name_variables required by the legacy name template are present in the provided variables dict. Raises ValueError if any are missing.
  4. Formats the legacy name string with the provided variables and returns it.

Timer Method

The timer method uses an ExitStack to compose two context managers -- one for the legacy metric and one for the new metric -- so both timers run in the same with block.

Helper Functions

The module includes several private helper functions:

  • _value_is_provided(value) -- Returns True if a value is not None and (if it has length) has length > 0.
  • _get_dict_with_defined_args(...) -- Builds a kwargs dict including only the parameters that were explicitly provided.
  • _get_args_dict_with_extra_tags_if_set(...) -- Merges extra tags into the args dict.
  • _get_tags_with_extra(tags, extra_tags) -- Returns a new dict combining base tags and extra tags.

Usage

Used throughout Airflow's core to emit metrics during the naming migration period. Once migration is complete and all dashboards use the new names, export_legacy_names can be set to False to stop dual emission.

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/observability/src/airflow_shared/observability/metrics/dual_stats_manager.py (282 lines)

Signature

class DualStatsManager:
    """Helper class to abstract enabling/disabling the export of metrics with legacy names."""

    metrics_dict = MetricsRegistry()
    export_legacy_names: ClassVar[bool] = True

    @classmethod
    def initialize(cls, export_legacy_names: bool): ...

    @classmethod
    def get_legacy_stat(cls, stat: str, variables: dict[str, Any]) -> str | None: ...

    @classmethod
    def incr(cls, stat: str, count: int | None = None, rate: int | float | None = None,
             *, tags: dict[str, Any] | None = None, extra_tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def decr(cls, stat: str, count: int | None = None, rate: int | float | None = None,
             *, tags: dict[str, Any] | None = None, extra_tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def gauge(cls, stat: str, value: float, rate: int | float | None = None,
              delta: bool | None = None, *, tags: dict[str, Any] | None = None,
              extra_tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def timing(cls, stat: str, dt: DeltaType, *, tags: dict[str, Any] | None = None,
               extra_tags: dict[str, Any] | None = None) -> None: ...

    @classmethod
    def timer(cls, stat: str, tags: dict[str, Any] | None = None,
              extra_tags: dict[str, Any] | None = None, **kwargs): ...

Import

from airflow_shared.observability.metrics.dual_stats_manager import DualStatsManager

I/O Contract

Method Input Output Side Effects
initialize export_legacy_names: bool None Sets class-level flag
get_legacy_stat stat: str, variables: dict None Raises ValueError if metric not in registry or missing variables
incr stat: str, count, rate, tags, extra_tags None Emits up to two Stats.incr calls (legacy + new)
decr stat: str, count, rate, tags, extra_tags None Emits up to two Stats.decr calls
gauge stat: str, value, rate, delta, tags, extra_tags None Emits up to two Stats.gauge calls
timing stat: str, dt, tags, extra_tags None Emits up to two Stats.timing calls
timer stat: str, tags, extra_tags ExitStack (context manager) Enters up to two timer context managers

Usage Examples

Emitting Dual Metrics

from airflow_shared.observability.metrics.dual_stats_manager import DualStatsManager

# Initialize with legacy export enabled
DualStatsManager.initialize(export_legacy_names=True)

# Emit a counter with both new and legacy names
# extra_tags provide the variables needed for legacy name interpolation
DualStatsManager.incr(
    "dag_processing.import_errors",
    count=1,
    tags={"dag_id": "example_dag"},
    extra_tags={"dag_file": "/opt/airflow/dags/example.py"},
)

Disabling Legacy Export

# After migration is complete, disable legacy name emission
DualStatsManager.initialize(export_legacy_names=False)

# Now only the new metric name is emitted
DualStatsManager.incr("dag_processing.import_errors", count=1)

Using the Timer with Dual Emission

with DualStatsManager.timer(
    "scheduler.critical_section_duration",
    tags={"scheduler_id": "scheduler-0"},
    extra_tags={"hostname": "worker-1"},
):
    perform_critical_section()
# Both legacy and new timer metrics are recorded

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment