Implementation:Apache Airflow DualStatsManager
| Knowledge Sources | |
|---|---|
| Domains | Observability, Metrics, Migration |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Manages dual emission of legacy and new metric names during the Airflow metrics naming migration, allowing both old and new metric name formats to be emitted simultaneously via a single API.
Description
The DualStatsManager class is a helper that abstracts the process of emitting metrics with both legacy and new naming conventions during Airflow's metrics migration period. It decorates the underlying Stats logger to emit each metric call twice -- once with the legacy name format and once with the new name format -- so that dashboards and alerts can be migrated incrementally.
Core Design
metrics_dict-- A class-levelMetricsRegistryinstance that loads metric definitions frommetrics_template.yaml. This registry maps new metric names to their legacy equivalents and required variables.export_legacy_names: ClassVar[bool]-- A class variable (defaultTrue) that controls whether legacy metric names are emitted alongside new names.initialize(export_legacy_names)-- Sets theexport_legacy_namesflag.
Metric Methods
Each metric method (incr, decr, gauge, timing, timer) follows the same pattern:
- Accepts standard metric parameters plus an additional
extra_tagskeyword argument containing variables for legacy name interpolation. - If
export_legacy_namesisTrueandextra_tagsis provided:- Looks up the legacy name from the registry via
get_legacy_stat(stat, variables). - Emits the metric with the legacy name (using only the base tags, not extra_tags).
- Raises
ValueErrorif the stat is not in the registry or has no legacy name.
- Looks up the legacy name from the registry via
- Always emits the metric with the new name, merging
extra_tagsintotags.
Legacy Name Resolution
get_legacy_stat(stat, variables) performs the following:
- Looks up the stat in the
MetricsRegistry. RaisesValueErrorif not found. - Retrieves the
legacy_namefield. ReturnsNoneif set to"-"(no legacy equivalent). - Validates that all
name_variablesrequired by the legacy name template are present in the providedvariablesdict. RaisesValueErrorif any are missing. - Formats the legacy name string with the provided variables and returns it.
Timer Method
The timer method uses an ExitStack to compose two context managers -- one for the legacy metric and one for the new metric -- so both timers run in the same with block.
Helper Functions
The module includes several private helper functions:
_value_is_provided(value)-- ReturnsTrueif a value is notNoneand (if it has length) has length > 0._get_dict_with_defined_args(...)-- Builds a kwargs dict including only the parameters that were explicitly provided._get_args_dict_with_extra_tags_if_set(...)-- Merges extra tags into the args dict._get_tags_with_extra(tags, extra_tags)-- Returns a new dict combining base tags and extra tags.
Usage
Used throughout Airflow's core to emit metrics during the naming migration period. Once migration is complete and all dashboards use the new names, export_legacy_names can be set to False to stop dual emission.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/observability/src/airflow_shared/observability/metrics/dual_stats_manager.py(282 lines)
Signature
class DualStatsManager:
"""Helper class to abstract enabling/disabling the export of metrics with legacy names."""
metrics_dict = MetricsRegistry()
export_legacy_names: ClassVar[bool] = True
@classmethod
def initialize(cls, export_legacy_names: bool): ...
@classmethod
def get_legacy_stat(cls, stat: str, variables: dict[str, Any]) -> str | None: ...
@classmethod
def incr(cls, stat: str, count: int | None = None, rate: int | float | None = None,
*, tags: dict[str, Any] | None = None, extra_tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def decr(cls, stat: str, count: int | None = None, rate: int | float | None = None,
*, tags: dict[str, Any] | None = None, extra_tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def gauge(cls, stat: str, value: float, rate: int | float | None = None,
delta: bool | None = None, *, tags: dict[str, Any] | None = None,
extra_tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def timing(cls, stat: str, dt: DeltaType, *, tags: dict[str, Any] | None = None,
extra_tags: dict[str, Any] | None = None) -> None: ...
@classmethod
def timer(cls, stat: str, tags: dict[str, Any] | None = None,
extra_tags: dict[str, Any] | None = None, **kwargs): ...
Import
from airflow_shared.observability.metrics.dual_stats_manager import DualStatsManager
I/O Contract
| Method | Input | Output | Side Effects |
|---|---|---|---|
initialize |
export_legacy_names: bool |
None |
Sets class-level flag |
get_legacy_stat |
stat: str, variables: dict |
None | Raises ValueError if metric not in registry or missing variables
|
incr |
stat: str, count, rate, tags, extra_tags |
None |
Emits up to two Stats.incr calls (legacy + new)
|
decr |
stat: str, count, rate, tags, extra_tags |
None |
Emits up to two Stats.decr calls
|
gauge |
stat: str, value, rate, delta, tags, extra_tags |
None |
Emits up to two Stats.gauge calls
|
timing |
stat: str, dt, tags, extra_tags |
None |
Emits up to two Stats.timing calls
|
timer |
stat: str, tags, extra_tags |
ExitStack (context manager) |
Enters up to two timer context managers |
Usage Examples
Emitting Dual Metrics
from airflow_shared.observability.metrics.dual_stats_manager import DualStatsManager
# Initialize with legacy export enabled
DualStatsManager.initialize(export_legacy_names=True)
# Emit a counter with both new and legacy names
# extra_tags provide the variables needed for legacy name interpolation
DualStatsManager.incr(
"dag_processing.import_errors",
count=1,
tags={"dag_id": "example_dag"},
extra_tags={"dag_file": "/opt/airflow/dags/example.py"},
)
Disabling Legacy Export
# After migration is complete, disable legacy name emission
DualStatsManager.initialize(export_legacy_names=False)
# Now only the new metric name is emitted
DualStatsManager.incr("dag_processing.import_errors", count=1)
Using the Timer with Dual Emission
with DualStatsManager.timer(
"scheduler.critical_section_duration",
tags={"scheduler_id": "scheduler-0"},
extra_tags={"hostname": "worker-1"},
):
perform_critical_section()
# Both legacy and new timer metrics are recorded